diff --git a/cloud/node/README.md b/cloud/node/README.md index 79f03fd..7248bc3 100644 --- a/cloud/node/README.md +++ b/cloud/node/README.md @@ -75,3 +75,15 @@ The content of each message payload is a combination of `my-message-` and a digi Sent message: my-message-8 Sent message: my-message-9 ``` + +4. Run the Node.js producer to publish messages to the topic `my-topic` by OAuth2. + + ```bash + cd cloud/node + export ISSUER_URL = "" + export PRIVATE_KEY = "" + export AUDIENCE = "" + export SERVICE_URL = "" + node sample_producer_by_oauth2.js + ``` + diff --git a/cloud/node/connect_by_oauth2.js b/cloud/node/connect_by_oauth2.js new file mode 100644 index 0000000..7698d00 --- /dev/null +++ b/cloud/node/connect_by_oauth2.js @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const Pulsar = require('pulsar-client'); + +const issuer_url = process.env.ISSUER_URL; +const private_key = process.env.PRIVATE_KEY; +const audience = process.env.AUDIENCE; +const scope = process.env.SCOPE; +const service_url = process.env.SERVICE_URL; +const client_id = process.env.CLIENT_ID; +const client_secret = process.env.CLIENT_SECRET; + +(async () => { + const params = { + issuer_url: issuer_url + } + if (private_key.length > 0) { + params['private_key'] = private_key + } else { + params['client_id'] = client_id + params['client_secret'] = client_secret + } + if (audience.length > 0) { + params['audience'] = audience + } + if (scope.length > 0) { + params['scope'] = scope + } + const auth = new Pulsar.AuthenticationOauth2(params); + + // Create a client + const client = new Pulsar.Client({ + serviceUrl: service_url, + authentication: auth, + }); + + await client.close(); +})(); + \ No newline at end of file diff --git a/cloud/node/connect_by_toekn.js b/cloud/node/connect_by_token.js similarity index 100% rename from cloud/node/connect_by_toekn.js rename to cloud/node/connect_by_token.js diff --git a/cloud/node/sample_consumer_by_oauth2.js b/cloud/node/sample_consumer_by_oauth2.js new file mode 100644 index 0000000..36a9156 --- /dev/null +++ b/cloud/node/sample_consumer_by_oauth2.js @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +const Pulsar = require('pulsar-client'); + +const issuer_url = process.env.ISSUER_URL; +const private_key = process.env.PRIVATE_KEY; +const audience = process.env.AUDIENCE; +const scope = process.env.SCOPE; +const service_url = process.env.SERVICE_URL; +const client_id = process.env.CLIENT_ID; +const client_secret = process.env.CLIENT_SECRET; + +(async () => { + const params = { + issuer_url: issuer_url + } + if (private_key.length > 0) { + params['private_key'] = private_key + } else { + params['client_id'] = client_id + params['client_secret'] = client_secret + } + if (audience.length > 0) { + params['audience'] = audience + } + if (scope.length > 0) { + params['scope'] = scope + } + const auth = new Pulsar.AuthenticationOauth2(params); + + // Create a client + const client = new Pulsar.Client({ + serviceUrl: service_url, + authentication: auth, + operationTimeoutSeconds: 30, + }); + + // Create a consumer + const consumer = await client.subscribe({ + topic: 'persistent://public/default/my-topic', + subscription: 'sub1', + subscriptionType: 'Shared', + ackTimeoutMs: 10000, + }); + + // Receive messages + for (let i = 0; i < 10; i += 1) { + const msg = await consumer.receive(); + console.log(msg.getData().toString()); + consumer.acknowledge(msg); + } + + await consumer.close(); + await client.close(); +})(); diff --git a/cloud/node/sample_producer_by_oauth2.js b/cloud/node/sample_producer_by_oauth2.js new file mode 100644 index 0000000..f9cc605 --- /dev/null +++ b/cloud/node/sample_producer_by_oauth2.js @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +const Pulsar = require('pulsar-client'); + +const issuer_url = process.env.ISSUER_URL; +const private_key = process.env.PRIVATE_KEY; +const audience = process.env.AUDIENCE; +const scope = process.env.SCOPE; +const service_url = process.env.SERVICE_URL; +const client_id = process.env.CLIENT_ID; +const client_secret = process.env.CLIENT_SECRET; + +(async () => { + const params = { + issuer_url: issuer_url + } + if (private_key.length > 0) { + params['private_key'] = private_key + } else { + params['client_id'] = client_id + params['client_secret'] = client_secret + } + if (audience.length > 0) { + params['audience'] = audience + } + if (scope.length > 0) { + params['scope'] = scope + } + const auth = new Pulsar.AuthenticationOauth2(params); + + // Create a client + const client = new Pulsar.Client({ + serviceUrl: service_url, + authentication: auth, + operationTimeoutSeconds: 30, + }); + + // Create a producer + const producer = await client.createProducer({ + topic: 'persistent://public/default/my-topic', + sendTimeoutMs: 30000, + batchingEnabled: true, + }); + + // Send messages + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + console.log(`Sent message: ${msg}`); + } + await producer.flush(); + + await producer.close(); + await client.close(); +})();