From 580831dcba98c34e0caef1ab53b8b58399f16bb8 Mon Sep 17 00:00:00 2001 From: earthchen Date: Tue, 13 Jan 2026 15:00:19 +0800 Subject: [PATCH 1/4] opt triple backpress test --- .../dubbo-samples-triple-backpressure/pom.xml | 6 + .../backpressure/BackpressureProvider.java | 10 +- .../backpressure/EmbeddedZooKeeper.java | 305 ++++++++++++++++++ .../samples/backpressure/BackpressureIT.java | 17 +- 4 files changed, 321 insertions(+), 17 deletions(-) create mode 100644 2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/EmbeddedZooKeeper.java diff --git a/2-advanced/dubbo-samples-triple-backpressure/pom.xml b/2-advanced/dubbo-samples-triple-backpressure/pom.xml index c38992247..2cad1d88e 100644 --- a/2-advanced/dubbo-samples-triple-backpressure/pom.xml +++ b/2-advanced/dubbo-samples-triple-backpressure/pom.xml @@ -36,6 +36,7 @@ 3.3.7-SNAPSHOT 2.20.0 + 4.3.30.RELEASE @@ -49,6 +50,11 @@ dubbo-zookeeper-curator5-spring-boot-starter ${dubbo.version} + + org.springframework + spring-context-support + ${spring.version} + io.dropwizard.metrics diff --git a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java index a4f214a8f..1a8bd504c 100644 --- a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java +++ b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java @@ -35,17 +35,21 @@ public class BackpressureProvider { private static final String ZOOKEEPER_HOST = System.getProperty("zookeeper.address", "127.0.0.1"); private static final String ZOOKEEPER_PORT = System.getProperty("zookeeper.port", "2181"); + public static final String ZK_ADDRESS = "zookeeper://" + ZOOKEEPER_HOST + ":" + ZOOKEEPER_PORT; + public static void main(String[] args) { + + new EmbeddedZooKeeper(Integer.parseInt(ZOOKEEPER_PORT), false).start(); + ServiceConfig service = new ServiceConfig<>(); service.setInterface(BackpressureService.class); service.setRef(new BackpressureServiceImpl()); - String zkAddress = "zookeeper://" + ZOOKEEPER_HOST + ":" + ZOOKEEPER_PORT; - LOGGER.info("Using ZooKeeper: {}", zkAddress); + LOGGER.info("Using ZooKeeper: {}", ZK_ADDRESS); DubboBootstrap bootstrap = DubboBootstrap.getInstance(); bootstrap.application(new ApplicationConfig("backpressure-provider")) - .registry(new RegistryConfig(zkAddress)) + .registry(new RegistryConfig(ZK_ADDRESS)) .protocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051)) .service(service) .start(); diff --git a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/EmbeddedZooKeeper.java b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/EmbeddedZooKeeper.java new file mode 100644 index 000000000..a6348b022 --- /dev/null +++ b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/EmbeddedZooKeeper.java @@ -0,0 +1,305 @@ + +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.samples.backpressure; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.ServerSocket; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.util.ErrorHandler; + +/** + * from: https://github.com/spring-projects/spring-xd/blob/v1.3.1.RELEASE/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/zookeeper/ZooKeeperUtils.java + *

+ * Helper class to start an embedded instance of standalone (non clustered) ZooKeeper. + *

+ * NOTE: at least an external standalone server (if not an ensemble) are recommended, even for + * {@link org.springframework.xd.dirt.server.singlenode.SingleNodeApplication} + */ +public class EmbeddedZooKeeper implements SmartLifecycle { + + private static final Random RANDOM = new Random(); + + /** + * Logger. + */ + private static final Logger logger = LoggerFactory.getLogger(EmbeddedZooKeeper.class); + + /** + * ZooKeeper client port. This will be determined dynamically upon startup. + */ + private final int clientPort; + + /** + * Whether to auto-start. Default is true. + */ + private boolean autoStartup = true; + + /** + * Lifecycle phase. Default is 0. + */ + private int phase = 0; + + /** + * Thread for running the ZooKeeper server. + */ + private volatile Thread zkServerThread; + + /** + * ZooKeeper server. + */ + private volatile ZooKeeperServerMain zkServer; + + /** + * {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. + */ + private ErrorHandler errorHandler; + + private boolean daemon = true; + + /** + * Construct an EmbeddedZooKeeper with a random port. + */ + public EmbeddedZooKeeper() { + clientPort = findRandomPort(30000, 65535); + } + + /** + * Construct an EmbeddedZooKeeper with the provided port. + * + * @param clientPort port for ZooKeeper server to bind to + */ + public EmbeddedZooKeeper(int clientPort, boolean daemon) { + this.clientPort = clientPort; + this.daemon = daemon; + } + + /** + * Returns the port that clients should use to connect to this embedded server. + * + * @return dynamically determined client port + */ + public int getClientPort() { + return this.clientPort; + } + + /** + * Specify whether to start automatically. Default is true. + * + * @param autoStartup whether to start automatically + */ + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + + /** + * Specify the lifecycle phase for the embedded server. + * + * @param phase the lifecycle phase + */ + public void setPhase(int phase) { + this.phase = phase; + } + + /** + * {@inheritDoc} + */ + @Override + public int getPhase() { + return this.phase; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isRunning() { + return (zkServerThread != null); + } + + /** + * Start the ZooKeeper server in a background thread. + *

+ * Register an error handler via {@link #setErrorHandler} in order to handle + * any exceptions thrown during startup or execution. + */ + @Override + public synchronized void start() { + if (zkServerThread == null) { + zkServerThread = new Thread(new ServerRunnable(), "ZooKeeper Server Starter"); + zkServerThread.setDaemon(daemon); + zkServerThread.start(); + } + } + + /** + * Shutdown the ZooKeeper server. + */ + @Override + public synchronized void stop() { + if (zkServerThread != null) { + // The shutdown method is protected...thus this hack to invoke it. + // This will log an exception on shutdown; see + // https://issues.apache.org/jira/browse/ZOOKEEPER-1873 for details. + try { + Method shutdown = ZooKeeperServerMain.class.getDeclaredMethod("shutdown"); + shutdown.setAccessible(true); + shutdown.invoke(zkServer); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // It is expected that the thread will exit after + // the server is shutdown; this will block until + // the shutdown is complete. + try { + zkServerThread.join(5000); + zkServerThread = null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted while waiting for embedded ZooKeeper to exit"); + // abandoning zk thread + zkServerThread = null; + } + } + } + + /** + * Stop the server if running and invoke the callback when complete. + */ + @Override + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + /** + * Provide an {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. If none + * is provided, only error-level logging will occur. + * + * @param errorHandler the {@link ErrorHandler} to be invoked + */ + public void setErrorHandler(ErrorHandler errorHandler) { + this.errorHandler = errorHandler; + } + + /** + * Runnable implementation that starts the ZooKeeper server. + */ + private class ServerRunnable implements Runnable { + + @Override + public void run() { + try { + Properties properties = new Properties(); + File file = new File(System.getProperty("java.io.tmpdir") + + File.separator + UUID.randomUUID()); + file.deleteOnExit(); + properties.setProperty("dataDir", file.getAbsolutePath()); + properties.setProperty("clientPort", String.valueOf(clientPort)); + + QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig(); + quorumPeerConfig.parseProperties(properties); + + zkServer = new ZooKeeperServerMain(); + ServerConfig configuration = new ServerConfig(); + configuration.readFrom(quorumPeerConfig); + + System.setProperty("zookeeper.admin.enableServer", "false"); + + zkServer.runFromConfig(configuration); + } catch (Exception e) { + if (errorHandler != null) { + errorHandler.handleError(e); + } else { + logger.error("Exception running embedded ZooKeeper", e); + } + } + } + } + + /** + * Workaround for SocketUtils.findRandomPort() deprecation. + * + * @param min min port + * @param max max port + * @return a random generated available port + */ + private static int findRandomPort(int min, int max) { + if (min < 1024) { + throw new IllegalArgumentException("Max port shouldn't be less than 1024."); + } + + if (max > 65535) { + throw new IllegalArgumentException("Max port shouldn't be greater than 65535."); + } + + if (min > max) { + throw new IllegalArgumentException("Min port shouldn't be greater than max port."); + } + + int port = 0; + int counter = 0; + + // Workaround for legacy JDK doesn't support Random.nextInt(min, max). + List randomInts = RANDOM.ints(min, max + 1) + .limit(max - min) + .mapToObj(Integer::valueOf) + .collect(Collectors.toList()); + + do { + if (counter > max - min) { + throw new IllegalStateException("Unable to find a port between " + min + "-" + max); + } + + port = randomInts.get(counter); + counter++; + } while (isPortInUse(port)); + + return port; + } + + private static boolean isPortInUse(int port) { + try (ServerSocket ignored = new ServerSocket(port)) { + return false; + } catch (IOException e) { + // continue + } + return true; + } +} diff --git a/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java b/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java index 752e61415..6be523671 100644 --- a/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java +++ b/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java @@ -16,6 +16,7 @@ */ package org.apache.dubbo.samples.backpressure; +import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.stream.ClientCallStreamObserver; import org.apache.dubbo.common.stream.ClientResponseObserver; import org.apache.dubbo.common.stream.StreamObserver; @@ -78,37 +79,25 @@ public class BackpressureIT { private static final Logger LOGGER = LoggerFactory.getLogger(BackpressureIT.class); - // Use random port to avoid conflicts between consecutive test runs - private static final int PORT = 50052 + (int)(Math.random() * 1000); - private static BackpressureService service; private static DubboBootstrap bootstrap; @BeforeClass public static void setup() { - // Provider config - ServiceConfig serviceConfig = new ServiceConfig<>(); - serviceConfig.setInterface(BackpressureService.class); - serviceConfig.setRef(new BackpressureServiceImpl()); - // Consumer config with direct connection (no registry needed) ReferenceConfig reference = new ReferenceConfig<>(); reference.setInterface(BackpressureService.class); - reference.setUrl("tri://127.0.0.1:" + PORT); + reference.setProtocol(CommonConstants.TRIPLE); reference.setTimeout(60000); // Start both provider and consumer using newInstance to avoid singleton pollution bootstrap = DubboBootstrap.getInstance(); bootstrap.application(new ApplicationConfig("backpressure-test")) - .registry(new RegistryConfig("N/A")) // No registry needed - .protocol(new ProtocolConfig("tri", PORT)) - .service(serviceConfig) + .registry(new RegistryConfig(BackpressureProvider.ZK_ADDRESS)) // No registry needed .reference(reference) .start(); service = reference.get(); - LOGGER.info("Provider and Consumer started on port {}", PORT); - // Warm up the connection to ensure resources are properly initialized // This prevents timing issues when running individual tests try { From e1796478d31a374395ce46d5be602725f349fe86 Mon Sep 17 00:00:00 2001 From: earthchen Date: Tue, 13 Jan 2026 15:24:56 +0800 Subject: [PATCH 2/4] fix --- .../dubbo-samples-triple-backpressure/pom.xml | 6 ---- .../backpressure/EmbeddedZooKeeper.java | 33 ++----------------- 2 files changed, 3 insertions(+), 36 deletions(-) diff --git a/2-advanced/dubbo-samples-triple-backpressure/pom.xml b/2-advanced/dubbo-samples-triple-backpressure/pom.xml index 2cad1d88e..c38992247 100644 --- a/2-advanced/dubbo-samples-triple-backpressure/pom.xml +++ b/2-advanced/dubbo-samples-triple-backpressure/pom.xml @@ -36,7 +36,6 @@ 3.3.7-SNAPSHOT 2.20.0 - 4.3.30.RELEASE @@ -50,11 +49,6 @@ dubbo-zookeeper-curator5-spring-boot-starter ${dubbo.version} - - org.springframework - spring-context-support - ${spring.version} - io.dropwizard.metrics diff --git a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/EmbeddedZooKeeper.java b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/EmbeddedZooKeeper.java index a6348b022..a0406ecd9 100644 --- a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/EmbeddedZooKeeper.java +++ b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/EmbeddedZooKeeper.java @@ -32,8 +32,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.context.SmartLifecycle; -import org.springframework.util.ErrorHandler; /** * from: https://github.com/spring-projects/spring-xd/blob/v1.3.1.RELEASE/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/zookeeper/ZooKeeperUtils.java @@ -43,7 +41,7 @@ * NOTE: at least an external standalone server (if not an ensemble) are recommended, even for * {@link org.springframework.xd.dirt.server.singlenode.SingleNodeApplication} */ -public class EmbeddedZooKeeper implements SmartLifecycle { +public class EmbeddedZooKeeper { private static final Random RANDOM = new Random(); @@ -77,11 +75,6 @@ public class EmbeddedZooKeeper implements SmartLifecycle { */ private volatile ZooKeeperServerMain zkServer; - /** - * {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. - */ - private ErrorHandler errorHandler; - private boolean daemon = true; /** @@ -122,7 +115,6 @@ public void setAutoStartup(boolean autoStartup) { /** * {@inheritDoc} */ - @Override public boolean isAutoStartup() { return this.autoStartup; } @@ -139,7 +131,6 @@ public void setPhase(int phase) { /** * {@inheritDoc} */ - @Override public int getPhase() { return this.phase; } @@ -147,7 +138,6 @@ public int getPhase() { /** * {@inheritDoc} */ - @Override public boolean isRunning() { return (zkServerThread != null); } @@ -158,7 +148,6 @@ public boolean isRunning() { * Register an error handler via {@link #setErrorHandler} in order to handle * any exceptions thrown during startup or execution. */ - @Override public synchronized void start() { if (zkServerThread == null) { zkServerThread = new Thread(new ServerRunnable(), "ZooKeeper Server Starter"); @@ -170,7 +159,6 @@ public synchronized void start() { /** * Shutdown the ZooKeeper server. */ - @Override public synchronized void stop() { if (zkServerThread != null) { // The shutdown method is protected...thus this hack to invoke it. @@ -202,21 +190,11 @@ public synchronized void stop() { /** * Stop the server if running and invoke the callback when complete. */ - @Override public void stop(Runnable callback) { stop(); callback.run(); } - /** - * Provide an {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. If none - * is provided, only error-level logging will occur. - * - * @param errorHandler the {@link ErrorHandler} to be invoked - */ - public void setErrorHandler(ErrorHandler errorHandler) { - this.errorHandler = errorHandler; - } /** * Runnable implementation that starts the ZooKeeper server. @@ -227,8 +205,7 @@ private class ServerRunnable implements Runnable { public void run() { try { Properties properties = new Properties(); - File file = new File(System.getProperty("java.io.tmpdir") - + File.separator + UUID.randomUUID()); + File file = new File(System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID()); file.deleteOnExit(); properties.setProperty("dataDir", file.getAbsolutePath()); properties.setProperty("clientPort", String.valueOf(clientPort)); @@ -244,11 +221,7 @@ public void run() { zkServer.runFromConfig(configuration); } catch (Exception e) { - if (errorHandler != null) { - errorHandler.handleError(e); - } else { - logger.error("Exception running embedded ZooKeeper", e); - } + logger.error("Exception running embedded ZooKeeper", e); } } } From 55b082f0482cb11897b531d691b817642fc5348c Mon Sep 17 00:00:00 2001 From: earthchen Date: Wed, 14 Jan 2026 13:47:00 +0800 Subject: [PATCH 3/4] opt tri backpressure test user embed zk --- .../case-configuration.yml | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml b/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml index 396ffb97b..ed115d52d 100644 --- a/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml +++ b/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml @@ -13,20 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - services: - zookeeper: - image: zookeeper:latest - dubbo-samples-triple-backpressure: type: app basedir: . mainClass: org.apache.dubbo.samples.backpressure.BackpressureProvider - systemProps: - - zookeeper.address=zookeeper - - zookeeper.port=2181 - waitPortsBeforeRun: - - zookeeper:2181 checkPorts: - 50051 checkLog: "BackpressureProvider started" @@ -37,12 +28,12 @@ services: tests: - "**/*IT.class" systemProps: - - zookeeper.address=zookeeper + - zookeeper.host=dubbo-samples-triple-backpressure - zookeeper.port=2181 - dubbo.address=dubbo-samples-triple-backpressure - dubbo.port=50051 waitPortsBeforeRun: - - zookeeper:2181 - dubbo-samples-triple-backpressure:50051 + - dubbo-samples-triple-backpressure:2181 depends_on: - dubbo-samples-triple-backpressure From e02412806c81ba578c9ae95f96a1310a3d3b5a46 Mon Sep 17 00:00:00 2001 From: earthchen Date: Thu, 15 Jan 2026 12:46:53 +0800 Subject: [PATCH 4/4] opt tri backpressure test user embed zk --- .../dubbo-samples-triple-backpressure/case-configuration.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml b/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml index ed115d52d..f0c04da5a 100644 --- a/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml +++ b/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml @@ -19,6 +19,7 @@ services: basedir: . mainClass: org.apache.dubbo.samples.backpressure.BackpressureProvider checkPorts: + - 2181 - 50051 checkLog: "BackpressureProvider started" @@ -28,7 +29,7 @@ services: tests: - "**/*IT.class" systemProps: - - zookeeper.host=dubbo-samples-triple-backpressure + - zookeeper.address=dubbo-samples-triple-backpressure - zookeeper.port=2181 - dubbo.address=dubbo-samples-triple-backpressure - dubbo.port=50051