From 181682b961c00ee22ce7fb39fdc6cb913958719b Mon Sep 17 00:00:00 2001 From: rythm-sachdeva Date: Sat, 24 Jan 2026 14:41:06 +0530 Subject: [PATCH] feat(java): Implemented Async Connection Pooling using FixedChannelPool fix(java-sdk):Added Missing PoolMetrics and linting issues fix(java):fixed connection error fix(java):Fixed Linting Issues fix(java):fixed builder issues fix(java):fixed ci issues fix(java): fixed memory leaks fix(java):fixed memory leak at channelread0 fix(java):fixed broadcastasync memory leak --- .../client/async/tcp/AsyncIggyTcpClient.java | 15 +- .../client/async/tcp/AsyncTcpConnection.java | 394 ++++++++++++------ .../iggy/client/async/tcp/PoolMetrics.java | 212 ++++++++++ .../iggy/client/async/tcp/UsersTcpClient.java | 30 +- .../async/AsyncClientIntegrationTest.java | 24 ++ 5 files changed, 539 insertions(+), 136 deletions(-) create mode 100644 foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PoolMetrics.java diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java index 4729f01459..ee29ccc97a 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java @@ -24,6 +24,7 @@ import org.apache.iggy.client.async.StreamsClient; import org.apache.iggy.client.async.TopicsClient; import org.apache.iggy.client.async.UsersClient; +import org.apache.iggy.client.async.tcp.AsyncTcpConnection.TCPConnectionPoolConfig; import org.apache.iggy.config.RetryPolicy; import org.apache.iggy.exception.IggyMissingCredentialsException; import org.apache.iggy.exception.IggyNotConnectedException; @@ -158,7 +159,15 @@ public static AsyncIggyTcpClientBuilder builder() { * @return a {@link CompletableFuture} that completes when the connection is established */ public CompletableFuture connect() { - connection = new AsyncTcpConnection(host, port, enableTls, tlsCertificate); + TCPConnectionPoolConfig.Builder poolConfigBuilder = new TCPConnectionPoolConfig.Builder(); + if (connectionPoolSize.isPresent()) { + poolConfigBuilder.setMaxConnections(connectionPoolSize.get()); + } + if (connectionTimeout.isPresent()) { + poolConfigBuilder.setAcquireTimeoutMillis(connectionTimeout.get().toMillis()); + } + TCPConnectionPoolConfig poolConfig = poolConfigBuilder.build(); + connection = new AsyncTcpConnection(host, port, enableTls, tlsCertificate, poolConfig); return connection.connect().thenRun(() -> { messagesClient = new MessagesTcpClient(connection); consumerGroupsClient = new ConsumerGroupsTcpClient(connection); @@ -204,6 +213,10 @@ public UsersClient users() { return usersClient; } + public PoolMetrics getTcpConnectionMetrics() { + return this.connection.getMetrics(); + } + /** * Returns the async messages client for producing and consuming messages. * diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java index d316fcf1c8..3a430c7452 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java @@ -24,16 +24,19 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; +import io.netty.channel.pool.AbstractChannelPoolHandler; +import io.netty.channel.pool.ChannelHealthChecker; +import io.netty.channel.pool.FixedChannelPool; +import io.netty.channel.pool.SimpleChannelPool; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.util.concurrent.FutureListener; import org.apache.iggy.exception.IggyNotConnectedException; import org.apache.iggy.exception.IggyServerException; import org.apache.iggy.exception.IggyTlsException; @@ -42,10 +45,13 @@ import javax.net.ssl.SSLException; import java.io.File; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; /** * Async TCP connection using Netty for non-blocking I/O. @@ -61,21 +67,31 @@ public class AsyncTcpConnection { private final SslContext sslContext; private final EventLoopGroup eventLoopGroup; private final Bootstrap bootstrap; - private Channel channel; - private final AtomicLong requestIdGenerator = new AtomicLong(0); - private final ConcurrentHashMap> pendingRequests = new ConcurrentHashMap<>(); + private SimpleChannelPool channelPool; + private final TCPConnectionPoolConfig poolConfig; + private final PoolMetrics poolMetrics; + + private final AtomicBoolean isClosed = new AtomicBoolean(false); public AsyncTcpConnection(String host, int port) { - this(host, port, false, Optional.empty()); + this(host, port, false, Optional.empty(), new TCPConnectionPoolConfig(5, 1000, 1000)); } - public AsyncTcpConnection(String host, int port, boolean enableTls, Optional tlsCertificate) { + public AsyncTcpConnection( + String host, + int port, + boolean enableTls, + Optional tlsCertificate, + TCPConnectionPoolConfig poolConfig) { this.host = host; this.port = port; this.enableTls = enableTls; this.tlsCertificate = tlsCertificate; + this.poolConfig = poolConfig; this.eventLoopGroup = new NioEventLoopGroup(); this.bootstrap = new Bootstrap(); + this.poolMetrics = new PoolMetrics(); + if (this.enableTls) { try { SslContextBuilder builder = SslContextBuilder.forClient(); @@ -95,45 +111,88 @@ private void configureBootstrap() { .group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.SO_KEEPALIVE, true) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); - - if (enableTls) { - pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), host, port)); - } + .remoteAddress(this.host, this.port); + } - // Custom frame decoder for Iggy protocol responses - pipeline.addLast("frameDecoder", new IggyFrameDecoder()); + /** + * Initialises Connection pool. + */ + public CompletableFuture connect() { + if (isClosed.get()) { + return CompletableFuture.failedFuture(new IllegalStateException("Client is Closed")); + } + AbstractChannelPoolHandler poolHandler = new AbstractChannelPoolHandler() { + @Override + public void channelCreated(Channel ch) { + ChannelPipeline pipeline = ch.pipeline(); + if (enableTls) { + // adding ssl if ssl enabled + pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), host, port)); + } + // Adding the FrameDecoder to end of channel pipeline + pipeline.addLast("frameDecoder", new IggyFrameDecoder()); - // No encoder needed - we build complete frames following Iggy protocol - // The protocol already includes the length field, so adding an encoder - // would duplicate it. This matches the blocking client implementation. + // Adding Response Handler Now Statefull + pipeline.addLast("responseHandler", new IggyResponseHandler()); + } - // Response handler - pipeline.addLast("responseHandler", new IggyResponseHandler(pendingRequests)); - } - }); + @Override + public void channelAcquired(Channel ch) { + IggyResponseHandler handler = ch.pipeline().get(IggyResponseHandler.class); + handler.setPool(channelPool); + } + }; + + this.channelPool = new FixedChannelPool( + bootstrap, + poolHandler, + ChannelHealthChecker.ACTIVE, // Check If the connection is Active Before Lending + FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take too long + poolConfig.getAcquireTimeoutMillis(), + poolConfig.getMaxConnections(), + poolConfig.getMaxPendingAcquires()); + log.info("Connection pool initialized with max connections: {}", poolConfig.getMaxConnections()); + return CompletableFuture.completedFuture(null); } /** - * Connects to the server asynchronously. + * Returns Pool metrics. */ - public CompletableFuture connect() { - CompletableFuture future = new CompletableFuture<>(); + public PoolMetrics getMetrics() { + return this.poolMetrics; + } - bootstrap.connect(host, port).addListener((ChannelFutureListener) channelFuture -> { - if (channelFuture.isSuccess()) { - channel = channelFuture.channel(); - future.complete(null); - } else { - future.completeExceptionally(channelFuture.cause()); + /** + * BroadCasts Command to each connection + * (Mainly for login so that each connection in the pool is Authenticated) + * Returns the result of the LAST connection's execution, allowing the caller + * to treat this like a single request. + */ + public CompletableFuture broadcastAsync(int commandCode, ByteBuf payload) { + try { + if (isClosed.get()) { + return CompletableFuture.failedFuture(new IllegalStateException("Client is closed")); } - }); + if (channelPool == null) { + return CompletableFuture.failedFuture( + new IllegalStateException("Client not connected call connect first")); + } + List> lastFuture = new ArrayList<>(); + + int poolSize = poolConfig.getMaxConnections(); - return future; + for (int i = 0; i < poolSize; i++) { + lastFuture.add(send(commandCode, payload.retainedDuplicate())); + } + for (int i = 0; i < poolSize - 1; i++) { + lastFuture.get(i).thenAccept(response -> response.release()); + } + return lastFuture.get(lastFuture.size() - 1); + } finally { + payload.release(); + } } /** @@ -141,63 +200,97 @@ public CompletableFuture connect() { * Uses Netty's EventLoop to ensure thread-safe sequential request processing with FIFO response matching. */ public CompletableFuture send(int commandCode, ByteBuf payload) { - if (channel == null || !channel.isActive()) { - payload.release(); + if (isClosed.get()) { + return CompletableFuture.failedFuture( + new IggyNotConnectedException("Connection not established or closed")); + } + if (channelPool == null) { return CompletableFuture.failedFuture( new IggyNotConnectedException("Connection not established or closed")); } + // Starting the response clock + long starttime = System.nanoTime(); + CompletableFuture responseFuture = new CompletableFuture<>(); - // Execute on channel's EventLoop to ensure sequential processing - // This is necessary because Iggy protocol doesn't include request IDs, - // and responses are matched using FIFO order - channel.eventLoop().execute(() -> { - // Since Iggy doesn't use request IDs, we'll just use a simple queue - // Each request will get the next response in order - long requestId = requestIdGenerator.incrementAndGet(); - pendingRequests.put(requestId, responseFuture); - - // Build the request frame exactly like the blocking client - // Frame format: [payload_size:4][command:4][payload:N] - // where payload_size = 4 (command size) + N (payload size) - int payloadSize = payload.readableBytes(); - int framePayloadSize = 4 + payloadSize; // command (4 bytes) + payload - - ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize); - frame.writeIntLE(framePayloadSize); // Length field (includes command) - frame.writeIntLE(commandCode); // Command - frame.writeBytes(payload, payload.readerIndex(), payloadSize); // Payload - - // Debug: print frame bytes - if (log.isTraceEnabled()) { - byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)]; - frame.getBytes(0, frameBytes); - StringBuilder hex = new StringBuilder(); - for (byte b : frameBytes) { - hex.append(String.format("%02x ", b)); - } - log.trace( - "Sending frame with command: {}, payload size: {}, frame payload size (with command): {}, total frame size: {}", - commandCode, - payloadSize, - framePayloadSize, - frame.readableBytes()); - log.trace("Frame bytes (hex): {}", hex.toString()); + channelPool.acquire().addListener((FutureListener) f -> { + + // Stoping the watch to record waitime + long waitTime = System.nanoTime() - starttime; + poolMetrics.recordWaitTime(waitTime); + + if (!f.isSuccess()) { + poolMetrics.recordError(); + responseFuture.completeExceptionally(f.cause()); + return; } - payload.release(); + // Connection Aquired + poolMetrics.incrementActive(); - // Send the frame - channel.writeAndFlush(frame).addListener((ChannelFutureListener) future -> { - if (!future.isSuccess()) { - log.error("Failed to send frame: {}", future.cause().getMessage()); - pendingRequests.remove(requestId); - responseFuture.completeExceptionally(future.cause()); - } else { - log.trace("Frame sent successfully to {}", channel.remoteAddress()); + Channel channel = f.getNow(); + try { + IggyResponseHandler handler = channel.pipeline().get(IggyResponseHandler.class); + + CompletableFuture trackedFuture = responseFuture.whenComplete((res, ex) -> { + poolMetrics.decrementActive(); + }); + if (handler == null) { + throw new IllegalStateException("Channel missing IggyResponseHandler"); } - }); + + // Enqueuing request so handler knows who to call back; + handler.enqueueRequest(responseFuture); + + // Build the request frame exactly like the blocking client + // Frame format: [payload_size:4][command:4][payload:N] + // where payload_size = 4 (command size) + N (payload size) + int payloadSize = payload.readableBytes(); + int framePayloadSize = 4 + payloadSize; // command (4 bytes) + payload + + ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize); + frame.writeIntLE(framePayloadSize); // Length field (includes command) + frame.writeIntLE(commandCode); // Command + frame.writeBytes(payload, payload.readerIndex(), payloadSize); // Payload + + // Debug: print frame bytes + byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)]; + if (log.isTraceEnabled()) { + frame.getBytes(0, frameBytes); + StringBuilder hex = new StringBuilder(); + for (byte b : frameBytes) { + hex.append(String.format("%02x ", b)); + } + log.trace( + "Sending frame with command: {}, payload size: {}, frame payload size (with command): {}, total frame size: {}", + commandCode, + payloadSize, + framePayloadSize, + frame.readableBytes()); + log.trace("Frame bytes (hex): {}", hex.toString()); + } + + payload.release(); + + // Send the frame + channel.writeAndFlush(frame).addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + log.error("Failed to send frame: {}", future.cause().getMessage()); + frame.release(); + channel.close(); + channelPool.release(channel); + poolMetrics.recordError(); + responseFuture.completeExceptionally(future.cause()); + } else { + log.trace("Frame sent successfully to {}", channel.remoteAddress()); + } + }); + + } catch (RuntimeException e) { + channelPool.release(channel); + responseFuture.completeExceptionally(e); + } }); return responseFuture; @@ -207,33 +300,32 @@ public CompletableFuture send(int commandCode, ByteBuf payload) { * Closes the connection and releases resources. */ public CompletableFuture close() { - CompletableFuture future = new CompletableFuture<>(); - - if (channel != null && channel.isActive()) { - channel.close().addListener((ChannelFutureListener) channelFuture -> { - eventLoopGroup.shutdownGracefully(); - if (channelFuture.isSuccess()) { - future.complete(null); - } else { - future.completeExceptionally(channelFuture.cause()); - } - }); - } else { - eventLoopGroup.shutdownGracefully(); - future.complete(null); + if (isClosed.compareAndSet(false, true)) { + if (channelPool != null) { + channelPool.close(); + } + return CompletableFuture.runAsync(eventLoopGroup::shutdownGracefully); } - - return future; + return CompletableFuture.completedFuture(null); } /** * Response handler that correlates responses with requests. */ private static class IggyResponseHandler extends SimpleChannelInboundHandler { - private final ConcurrentHashMap> pendingRequests; + private final Queue> responseQueue = new ConcurrentLinkedQueue<>(); + private SimpleChannelPool pool; + + public IggyResponseHandler() { + this.pool = null; + } + + public void setPool(SimpleChannelPool pool) { + this.pool = pool; + } - public IggyResponseHandler(ConcurrentHashMap> pendingRequests) { - this.pendingRequests = pendingRequests; + public void enqueueRequest(CompletableFuture future) { + responseQueue.add(future); } @Override @@ -242,36 +334,94 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { int status = msg.readIntLE(); int length = msg.readIntLE(); - // Since Iggy doesn't use request IDs, we process responses in order - // Get the oldest pending request - if (!pendingRequests.isEmpty()) { - Long oldestRequestId = - pendingRequests.keySet().stream().min(Long::compare).orElse(null); + CompletableFuture future = responseQueue.poll(); - if (oldestRequestId != null) { - CompletableFuture future = pendingRequests.remove(oldestRequestId); + if (future != null) { - if (status == 0) { - // Success - pass the remaining buffer as response - future.complete(msg.retainedSlice()); - } else { - // Error - the payload contains the error message - byte[] errorBytes = length > 0 ? new byte[length] : new byte[0]; - if (length > 0) { - msg.readBytes(errorBytes); - } + if (status == 0) { + // Success - pass the remaining buffer as response + future.complete(msg.retainedSlice()); + } else { + // Error - the payload contains the error message + if (length > 0) { + byte[] errorBytes = new byte[length]; + msg.readBytes(errorBytes); future.completeExceptionally(IggyServerException.fromTcpResponse(status, errorBytes)); + } else { + future.completeExceptionally(new IggyServerException(status)); } } + } else { + log.error( + "Received response on channel {} but no request was waiting!", + ctx.channel().id()); + } + if (pool != null) { + pool.release(ctx.channel()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - // Fail all pending requests - pendingRequests.values().forEach(future -> future.completeExceptionally(cause)); - pendingRequests.clear(); + // If the connection dies, fail ALL waiting requests for this connection + CompletableFuture f; + while ((f = responseQueue.poll()) != null) { + f.completeExceptionally(cause); + } ctx.close(); } } + + // Inner Class for Channel pool configurations + public static class TCPConnectionPoolConfig { + private final int maxConnections; + private final int maxPendingAcquires; + private final long acquireTimeoutMillis; + + public TCPConnectionPoolConfig(int maxConnections, int maxPendingAcquires, long acquireTimeoutMillis) { + this.maxConnections = maxConnections; + this.maxPendingAcquires = maxPendingAcquires; + this.acquireTimeoutMillis = acquireTimeoutMillis; + } + + public int getMaxConnections() { + return this.maxConnections; + } + + public int getMaxPendingAcquires() { + return this.maxPendingAcquires; + } + + public long getAcquireTimeoutMillis() { + return this.acquireTimeoutMillis; + } + + // Builder Class for TCPConnectionPoolConfig + public static final class Builder { + private int maxConnections = 5; + private int maxPendingAcquires = 1000; + private long acquireTimeoutMillis = 5000; + + public Builder() {} + + public Builder setMaxConnections(int maxConnections) { + this.maxConnections = maxConnections; + return this; + } + + public Builder setMaxPendingAcquires(int maxPendingAcquires) { + this.maxPendingAcquires = maxPendingAcquires; + return this; + } + + public Builder setAcquireTimeoutMillis(long acquireTimeoutMillis) { + this.acquireTimeoutMillis = acquireTimeoutMillis; + return this; + } + + public TCPConnectionPoolConfig build() { + return new TCPConnectionPoolConfig(maxConnections, maxPendingAcquires, acquireTimeoutMillis); + } + } + } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PoolMetrics.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PoolMetrics.java new file mode 100644 index 0000000000..6c99b2476d --- /dev/null +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PoolMetrics.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.client.async.tcp; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Metrics for monitoring connection pool performance. + * Tracks active connections, wait times, and errors. + */ +public class PoolMetrics { + private final AtomicInteger activeConnections = new AtomicInteger(0); + private final AtomicLong totalWaitTimeNanos = new AtomicLong(0); + private final AtomicLong totalRequests = new AtomicLong(0); + private final AtomicLong totalErrors = new AtomicLong(0); + private final AtomicLong minWaitTimeNanos = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong maxWaitTimeNanos = new AtomicLong(0); + + /** + * Increments the count of active connections. + */ + public void incrementActive() { + activeConnections.incrementAndGet(); + } + + /** + * Decrements the count of active connections. + */ + public void decrementActive() { + activeConnections.decrementAndGet(); + } + + /** + * Records the wait time for acquiring a connection from the pool. + * + * @param waitTimeNanos wait time in nanoseconds + */ + public void recordWaitTime(long waitTimeNanos) { + totalRequests.incrementAndGet(); + totalWaitTimeNanos.addAndGet(waitTimeNanos); + + // Update min wait time + long currentMin; + do { + currentMin = minWaitTimeNanos.get(); + if (waitTimeNanos >= currentMin) { + break; + } + } while (!minWaitTimeNanos.compareAndSet(currentMin, waitTimeNanos)); + + // Update max wait time + long currentMax; + do { + currentMax = maxWaitTimeNanos.get(); + if (waitTimeNanos <= currentMax) { + break; + } + } while (!maxWaitTimeNanos.compareAndSet(currentMax, waitTimeNanos)); + } + + /** + * Records an error occurrence. + */ + public void recordError() { + totalErrors.incrementAndGet(); + } + + /** + * Gets the current number of active connections. + * + * @return number of active connections + */ + public int getActiveConnections() { + return activeConnections.get(); + } + + /** + * Gets the total number of requests made. + * + * @return total request count + */ + public long getTotalRequests() { + return totalRequests.get(); + } + + /** + * Gets the total number of acquire requests (alias for getTotalRequests). + * + * @return total acquire request count + */ + public long getTotalAcquireRequests() { + return totalRequests.get(); + } + + /** + * Gets the total number of errors encountered. + * + * @return total error count + */ + public long getTotalErrors() { + return totalErrors.get(); + } + + /** + * Gets the average wait time in nanoseconds. + * + * @return average wait time, or 0 if no requests have been made + */ + public long getAverageWaitTimeNanos() { + long requests = totalRequests.get(); + return requests > 0 ? totalWaitTimeNanos.get() / requests : 0; + } + + /** + * Gets the average wait time in milliseconds. + * + * @return average wait time in milliseconds + */ + public double getAverageWaitTimeMillis() { + return getAverageWaitTimeNanos() / 1_000_000.0; + } + + /** + * Gets the minimum wait time in nanoseconds. + * + * @return minimum wait time, or 0 if no requests have been made + */ + public long getMinWaitTimeNanos() { + long min = minWaitTimeNanos.get(); + return min == Long.MAX_VALUE ? 0 : min; + } + + /** + * Gets the minimum wait time in milliseconds. + * + * @return minimum wait time in milliseconds + */ + public double getMinWaitTimeMillis() { + return getMinWaitTimeNanos() / 1_000_000.0; + } + + /** + * Gets the maximum wait time in nanoseconds. + * + * @return maximum wait time + */ + public long getMaxWaitTimeNanos() { + return maxWaitTimeNanos.get(); + } + + /** + * Gets the maximum wait time in milliseconds. + * + * @return maximum wait time in milliseconds + */ + public double getMaxWaitTimeMillis() { + return getMaxWaitTimeNanos() / 1_000_000.0; + } + + /** + * Gets the error rate as a percentage. + * + * @return error rate (0-100), or 0 if no requests have been made + */ + public double getErrorRate() { + long requests = totalRequests.get(); + return requests > 0 ? (totalErrors.get() * 100.0) / requests : 0.0; + } + + /** + * Resets all metrics to their initial values. + */ + public void reset() { + activeConnections.set(0); + totalWaitTimeNanos.set(0); + totalRequests.set(0); + totalErrors.set(0); + minWaitTimeNanos.set(Long.MAX_VALUE); + maxWaitTimeNanos.set(0); + } + + @Override + public String toString() { + return String.format( + "PoolMetrics{active=%d, requests=%d, errors=%d, avgWaitMs=%.2f, minWaitMs=%.2f, maxWaitMs=%.2f, errorRate=%.2f%%}", + getActiveConnections(), + getTotalRequests(), + getTotalErrors(), + getAverageWaitTimeMillis(), + getMinWaitTimeMillis(), + getMaxWaitTimeMillis(), + getErrorRate()); + } +} diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java index 7960302ec3..ddccc4a090 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java @@ -61,15 +61,17 @@ public CompletableFuture login(String username, String password) { log.debug("Logging in user: {}", username); - return connection.send(CommandCode.User.LOGIN.getValue(), payload).thenApply(response -> { - try { - // Read the user ID from response (4-byte unsigned int LE) - var userId = response.readUnsignedIntLE(); - return new IdentityInfo(userId, Optional.empty()); - } finally { - response.release(); - } - }); + return connection + .broadcastAsync(CommandCode.User.LOGIN.getValue(), payload) + .thenApply(response -> { + try { + // Read the user ID from response (4-byte unsigned int LE) + var userId = response.readUnsignedIntLE(); + return new IdentityInfo(userId, Optional.empty()); + } finally { + response.release(); + } + }); } @Override @@ -78,9 +80,11 @@ public CompletableFuture logout() { log.debug("Logging out"); - return connection.send(CommandCode.User.LOGOUT.getValue(), payload).thenAccept(response -> { - response.release(); - log.debug("Logged out successfully"); - }); + return connection + .broadcastAsync(CommandCode.User.LOGOUT.getValue(), payload) + .thenAccept(response -> { + response.release(); + log.debug("Logged out successfully"); + }); } } diff --git a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java index f529cffc7e..2b14e27a1d 100644 --- a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java +++ b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java @@ -387,4 +387,28 @@ void testConcurrentOperations() throws Exception { log.info("Successfully completed {} concurrent operations", operations.size()); } + + @Test + @Order(11) + public void testConnectionPoolMetrics() { + log.info("Testing Connection Pool Metrics"); + + // Retrieves the pool metrics object + var metrics = client.getTcpConnectionMetrics(); + + assertThat(metrics).isNotNull(); + + // By Now We must Have Some Aquire Requests + assertThat(metrics.getTotalAcquireRequests()).isGreaterThan(0); + + // Active connections should be 0 (or low) because all previous tests finished + // Note: It might not be exactly 0 if Netty hasn't fully released everything instantly, + // but it should be less than the default pool size (5). + assertThat(metrics.getActiveConnections()).isLessThanOrEqualTo(5); + + log.info( + "Metrics Verified: Requests={}, AvgWait={}ms", + metrics.getTotalAcquireRequests(), + metrics.getAverageWaitTimeMillis()); + } }