From 902da18b51132bdea860148272eaa99071c9f55f Mon Sep 17 00:00:00 2001 From: sinri Date: Fri, 31 Oct 2025 18:14:19 +0800 Subject: [PATCH 1/2] Upgrade this library to vert.x 5 --- pom.xml | 2 +- .../vertx/javaio/OutputToReadStream.java | 359 +++++++------- .../vertx/javaio/WriteToInputStream.java | 401 ++++++++------- .../vertx/javaio/OutputToReadStreamTest.java | 23 +- .../vertx/javaio/WriteToInputStreamTest.java | 456 +++++++++--------- 5 files changed, 618 insertions(+), 623 deletions(-) diff --git a/pom.xml b/pom.xml index dad71a3..bb72461 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ Utilities for integrating Vert.x IO with Java IO - 4.1.8 + 5.0.5 11 11 11 diff --git a/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java b/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java index f2b3792..e3661db 100644 --- a/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java +++ b/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java @@ -4,6 +4,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Objects; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicLong; @@ -41,180 +42,186 @@ * @author guss77 */ public class OutputToReadStream extends OutputStream implements ReadStream { - - private AtomicReference paused = new AtomicReference<>(new CountDownLatch(0)); - private boolean closed; - private AtomicLong demand = new AtomicLong(0); - private Handler endHandler = v -> {}; - private Handler dataHandler = d -> {}; - private Handler errorHandler = t -> {}; - private Context context; - - public OutputToReadStream(Vertx vertx) { - context = vertx.getOrCreateContext(); - } - - /** - * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. - * - * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the - * Java blocking IO and will try to propagate IO failures to the returned {@link Future}. - * - * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then - * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams - * will be safely closed and will issue leak warnings. - * - * @param source InputStream to drain - * @param sink WriteStream to pipe data to - * @return a Future that will succeed when all the data have been written and the streams closed, or fail if an - * {@link IOException} has occurred - */ - public Future pipeFromInput(InputStream source, WriteStream sink) { - Promise promise = Promise.promise(); - pipeTo(sink, promise); - ForkJoinPool.commonPool().submit(() -> { - try (final InputStream is = source; final OutputStream os = this){ - source.transferTo(this); - } catch (IOException e) { - promise.tryFail(e); - } - }); - return promise.future(); - } - - /** - * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. - * - * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the - * Java blocking IO and will try to propagate IO failures to the returned {@link Future} - * - * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then - * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams - * will be safely closed and will issue leak warnings. - * - * @param source InputStream to drain - * @param sink WriteStream to pipe data to - * @param handler a handler that will be called when all the data have been written and the streams closed, - * or if an {@link IOException} has occurred. - */ - public void pipeFromInput(InputStream source, WriteStream sink, Handler> handler) { - pipeFromInput(source, sink).onComplete(handler); - } - - /** - * Propagate an out-of-band error (likely generated or handled by the code that feeds the output stream) to the end of the - * read stream to let them know that the result is not going to be good. - * @param t error to be propagated down the stream - */ - public void sendError(Throwable t) { - context.executeBlocking(p -> { - try { - errorHandler.handle(t); - } finally { - p.tryComplete(); - } - }); - } - - /* ReadStream stuff */ - - @Override - public OutputToReadStream exceptionHandler(Handler handler) { - // we are usually not propagating exceptions as OutputStream has no mechanism for propagating exceptions down, - // except when wrapping an input stream, in which case we can forward InputStream read errors to the error handler. - errorHandler = Objects.requireNonNullElse(handler, t -> {}); - return this; - } - - @Override - public OutputToReadStream handler(Handler handler) { - this.dataHandler = Objects.requireNonNullElse(handler, d -> {}); - return this; - } - - @Override - public OutputToReadStream pause() { - paused.getAndSet(new CountDownLatch(1)).countDown(); - return this; - } - - @Override - public OutputToReadStream resume() { - paused.getAndSet(new CountDownLatch(0)).countDown(); - return this; - } - - @Override - public OutputToReadStream fetch(long amount) { - resume(); - demand.addAndGet(amount); - return null; - } - - @Override - public OutputToReadStream endHandler(Handler endHandler) { - this.endHandler = Objects.requireNonNullElse(endHandler, v -> {}); - return this; - } - - /* OutputStream stuff */ - - @Override - synchronized public void write(int b) throws IOException { - if (closed) - throw new IOException("OutputStream is closed"); - try { - paused.get().await(); - } catch (InterruptedException e) { - throw new IOException("Interrupted a wait for stream to resume", e); - } - push(Buffer.buffer(1).appendByte((byte) (b & 0xFF))); - } - - @Override - synchronized public void write(byte[] b, int off, int len) throws IOException { - if (closed) - throw new IOException("OutputStream is closed"); - try { - paused.get().await(); - } catch (InterruptedException e) { - throw new IOException("Interrupted a wait for stream to resume", e); - } - push(Buffer.buffer(len - off).appendBytes(b, off, len)); - } - - @Override - synchronized public void close() throws IOException { - if (closed) - return; - closed = true; - try { - paused.get().await(); - } catch (InterruptedException e) { - throw new IOException("Interrupted a wait for stream to resume", e); - } - push(null); - } - - /* Internal implementation */ - - private void push(Buffer data) { - var awaiter = new CountDownLatch(1); - context.runOnContext(v -> { - try { - if (data == null) // end of stream - endHandler.handle(null); - else - dataHandler.handle(data); - } catch (Throwable t) { - errorHandler.handle(t); - } finally { - awaiter.countDown(); - } - }); - try { - awaiter.await(); - } catch (InterruptedException e) { } - } - + + private AtomicReference paused = new AtomicReference<>(new CountDownLatch(0)); + private boolean closed; + private AtomicLong demand = new AtomicLong(0); + private Handler endHandler = v -> { + }; + private Handler dataHandler = d -> { + }; + private Handler errorHandler = t -> { + }; + private Context context; + + public OutputToReadStream(Vertx vertx) { + context = vertx.getOrCreateContext(); + } + + /** + * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. + *

+ * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the + * Java blocking IO and will try to propagate IO failures to the returned {@link Future}. + *

+ * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then + * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams + * will be safely closed and will issue leak warnings. + * + * @param source InputStream to drain + * @param sink WriteStream to pipe data to + * @return a Future that will succeed when all the data have been written and the streams closed, or fail if an + * {@link IOException} has occurred + */ + public Future pipeFromInput(InputStream source, WriteStream sink) { + Promise promise = Promise.promise(); + pipeTo(sink).onComplete(promise); + ForkJoinPool.commonPool().submit(() -> { + try (final InputStream is = source; final OutputStream os = this) { + source.transferTo(this); + } catch (IOException e) { + promise.tryFail(e); + } + }); + return promise.future(); + } + + /** + * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. + *

+ * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the + * Java blocking IO and will try to propagate IO failures to the returned {@link Future} + *

+ * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then + * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams + * will be safely closed and will issue leak warnings. + * + * @param source InputStream to drain + * @param sink WriteStream to pipe data to + * @param handler a handler that will be called when all the data have been written and the streams closed, + * or if an {@link IOException} has occurred. + */ + public void pipeFromInput(InputStream source, WriteStream sink, Handler> handler) { + pipeFromInput(source, sink).onComplete(handler); + } + + /** + * Propagate an out-of-band error (likely generated or handled by the code that feeds the output stream) to the end + * of the + * read stream to let them know that the result is not going to be good. + * + * @param t error to be propagated down the stream + */ + public void sendError(Throwable t) { + context.executeBlocking((Callable) () -> { + errorHandler.handle(t); + return null; + }); + } + + /* ReadStream stuff */ + + @Override + public OutputToReadStream exceptionHandler(Handler handler) { + // we are usually not propagating exceptions as OutputStream has no mechanism for propagating exceptions down, + // except when wrapping an input stream, in which case we can forward InputStream read errors to the error handler. + errorHandler = Objects.requireNonNullElse(handler, t -> { + }); + return this; + } + + @Override + public OutputToReadStream handler(Handler handler) { + this.dataHandler = Objects.requireNonNullElse(handler, d -> { + }); + return this; + } + + @Override + public OutputToReadStream pause() { + paused.getAndSet(new CountDownLatch(1)).countDown(); + return this; + } + + @Override + public OutputToReadStream resume() { + paused.getAndSet(new CountDownLatch(0)).countDown(); + return this; + } + + @Override + public OutputToReadStream fetch(long amount) { + resume(); + demand.addAndGet(amount); + return null; + } + + @Override + public OutputToReadStream endHandler(Handler endHandler) { + this.endHandler = Objects.requireNonNullElse(endHandler, v -> { + }); + return this; + } + + /* OutputStream stuff */ + + @Override + synchronized public void write(int b) throws IOException { + if (closed) + throw new IOException("OutputStream is closed"); + try { + paused.get().await(); + } catch (InterruptedException e) { + throw new IOException("Interrupted a wait for stream to resume", e); + } + push(Buffer.buffer(1).appendByte((byte) (b & 0xFF))); + } + + @Override + synchronized public void write(byte[] b, int off, int len) throws IOException { + if (closed) + throw new IOException("OutputStream is closed"); + try { + paused.get().await(); + } catch (InterruptedException e) { + throw new IOException("Interrupted a wait for stream to resume", e); + } + push(Buffer.buffer(len - off).appendBytes(b, off, len)); + } + + @Override + synchronized public void close() throws IOException { + if (closed) + return; + closed = true; + try { + paused.get().await(); + } catch (InterruptedException e) { + throw new IOException("Interrupted a wait for stream to resume", e); + } + push(null); + } + + /* Internal implementation */ + + private void push(Buffer data) { + var awaiter = new CountDownLatch(1); + context.runOnContext(v -> { + try { + if (data == null) // end of stream + endHandler.handle(null); + else + dataHandler.handle(data); + } catch (Throwable t) { + errorHandler.handle(t); + } finally { + awaiter.countDown(); + } + }); + try { + awaiter.await(); + } catch (InterruptedException e) { + } + } + } diff --git a/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java b/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java index 5412e79..e454205 100644 --- a/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java +++ b/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java @@ -27,205 +27,202 @@ * * @author guss77 */ -public class WriteToInputStream extends InputStream implements WriteStream{ - - private class PendingWrite { - Buffer data; - Promise completion; - int position = 0; - - private PendingWrite(Buffer data, Promise completion) { - this.data = data; - this.completion = Objects.requireNonNullElse(completion, Promise.promise()); - } - - public boolean shouldDiscard() { - if (data != null && position >= data.length()) { - completion.tryComplete(); - if (everFull.compareAndSet(true, false)) - context.runOnContext(drainHandler::handle); - return true; - } - return false; - } - - public int available() { - return data == null ? 0 : data.length(); - } - - public int readNext() { - if (data == null) { - this.completion.tryComplete(); - return -1; - } - int val = 0xFF & data.getByte(position++); // get byte's bitwise value, which is what InputStream#read() is supposed to return - if (position > data.length()) - completion.tryComplete(); - return val; - } - - public int read(byte[] b, int off, int len) { - if (data == null || position >= data.length()) { - completion.tryComplete(); - return data == null ? -1 : 0; - } - int max = Math.min(len, data.length() - position); - data.getBytes(position, position + max, b, off); - position += max; - return max; - } - } - - private Handler drainHandler = __ -> {}; - private Handler errorHandler = t -> {}; - private volatile int maxSize = 1000; - private volatile int maxBufferSize = Integer.MAX_VALUE; - private ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); - private AtomicBoolean everFull = new AtomicBoolean(); - private volatile boolean closed = false; - private ConcurrentLinkedQueue readsWaiting = new ConcurrentLinkedQueue<>(); - private Context context; - - public WriteToInputStream(Vertx vertx) { - context = vertx.getOrCreateContext(); - } - - /** - * Runs {@link #transferTo(OutputStream)} as a blocking task in the Vert.x context - * @param os Output stream to transfer the contents of this {@linkplain WriteStream} to - * @return a promise that will resolve when the {@linkplain WriteStream} has been closed - * and all data successfully transfered to the {@linkplain OutputStream}, or reject if - * there was either a {@linkplain WriteStream} error or an {@linkplain IOException} - */ - public Future wrap(OutputStream os) { - return context.executeBlocking(p -> { - try (os) { - transferTo(os); - p.complete(); - } catch (Throwable t) { - p.fail(t); - } - }).onFailure(t -> { - if (errorHandler != null) - errorHandler.handle(t); - }).mapEmpty(); - } - - /* WriteStream stuff */ - - @Override - public WriteToInputStream drainHandler(Handler handler) { - this.drainHandler = handler; - return this; - } - - @Override - public void end(Handler> handler) { - // signal end of stream by writing a null buffer - write(null, handler); - } - - @Override - public WriteToInputStream exceptionHandler(Handler handler) { - // we don't have a way to propagate errors as we don't actually handle writing out and InputStream provides no feedback mechanism. - errorHandler = handler; - return this; - } - - @Override - public Future write(Buffer data) { - if (closed) - // accept all data and discard it, as unlike JDK9 Flow, we have no way to tell upstream to stop sending data - return Future.succeededFuture(); - var promise = Promise.promise(); - if (data == null) // end of stream - buffer.add(new PendingWrite(null, promise)); - else - for (int start = 0; start < data.length();) { - var buf = data.length() < maxBufferSize ? data : data.getBuffer(start, Math.min(data.length(), start + maxBufferSize)); - start += buf.length(); - buffer.add(new PendingWrite(buf, start < data.length() ? null : promise)); - } - // flush waiting reads, if any - for (var l = readsWaiting.poll(); l != null; l = readsWaiting.poll()) l.countDown(); - return promise.future(); - } - - @Override - public void write(Buffer data, Handler> handler) { - write(data).onComplete(handler); - } - - @Override - public WriteToInputStream setWriteQueueMaxSize(int maxSize) { - this.maxSize = maxSize; - return this; - } - - public WriteToInputStream setMaxChunkSize(int maxSize) { - maxBufferSize = maxSize; - return this; - } - - @Override - public boolean writeQueueFull() { - if (buffer.size() < maxSize) - return false; - everFull.compareAndSet(false, true); - return true; - } - - /* InputStream stuff */ - - @Override - synchronized public int read() throws IOException { - while (true) { - while (!buffer.isEmpty() && buffer.peek().shouldDiscard()) buffer.poll(); - if (!buffer.isEmpty()) - return buffer.peek().readNext(); - // set latch to signal we are waiting - var latch = new CountDownLatch(1); - readsWaiting.add(latch); - if (buffer.isEmpty()) - try { - latch.await(); - } catch (InterruptedException e) { - throw new IOException("Failed to wait for data", e); - } - // now try to read again - } - } - - @Override - synchronized public int read(byte[] b, int off, int len) throws IOException { - if (b.length < off + len) // sanity first - return 0; - // we are going to be lazy here and not read more than one pending write, even if there are more available. The contract allows for that - while (!buffer.isEmpty() && buffer.peek().shouldDiscard()) buffer.poll(); - if (!buffer.isEmpty()) - return buffer.peek().read(b, off, len); - // we still wait if there is no data, but let read() implement the blocking - int val = read(); - if (val < 0) - return val; - b[off] = (byte) (val & 0xFF); - return 1 + Math.max(0, read(b, off + 1, len - 1)); - } - - @Override - public int available() throws IOException { - return buffer.stream().map(PendingWrite::available).reduce(0, (i,a) -> i += a); - } - - @Override - public void close() throws IOException { - super.close(); - closed = true; // mark us closed, so that additional writes are NOPed - // if we have any buffered data, flush it and trigger the write completions - while (!buffer.isEmpty()) - buffer.poll().completion.tryComplete(); - // see if we need to call the drain handler to drain upstream - if (everFull.compareAndSet(true, false)) - context.runOnContext(drainHandler::handle); - } -} +public class WriteToInputStream extends InputStream implements WriteStream { + + private Handler drainHandler = __ -> { + }; + private Handler errorHandler = t -> { + }; + private volatile int maxSize = 1000; + private volatile int maxBufferSize = Integer.MAX_VALUE; + private ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); + private AtomicBoolean everFull = new AtomicBoolean(); + private volatile boolean closed = false; + private ConcurrentLinkedQueue readsWaiting = new ConcurrentLinkedQueue<>(); + private Context context; + + public WriteToInputStream(Vertx vertx) { + context = vertx.getOrCreateContext(); + } + + /** + * Runs {@link #transferTo(OutputStream)} as a blocking task in the Vert.x context + * + * @param os Output stream to transfer the contents of this {@linkplain WriteStream} to + * @return a promise that will resolve when the {@linkplain WriteStream} has been closed + * and all data successfully transfered to the {@linkplain OutputStream}, or reject if + * there was either a {@linkplain WriteStream} error or an {@linkplain IOException} + */ + public Future wrap(OutputStream os) { + return context.executeBlocking(() -> { + try (os) { + transferTo(os); + return null; + } + }) + .onFailure(t -> { + if (errorHandler != null) + errorHandler.handle(t); + }) + .mapEmpty(); + } + + @Override + public WriteToInputStream drainHandler(Handler handler) { + this.drainHandler = handler; + return this; + } + + /* WriteStream stuff */ + + @Override + public Future end() { + return write(null); + } + + @Override + public WriteToInputStream exceptionHandler(Handler handler) { + // we don't have a way to propagate errors as we don't actually handle writing out and InputStream provides no feedback mechanism. + errorHandler = handler; + return this; + } + + @Override + public Future write(Buffer data) { + if (closed) + // accept all data and discard it, as unlike JDK9 Flow, we have no way to tell upstream to stop sending data + return Future.succeededFuture(); + var promise = Promise.promise(); + if (data == null) // end of stream + buffer.add(new PendingWrite(null, promise)); + else + for (int start = 0; start < data.length(); ) { + var buf = data.length() < maxBufferSize ? data : data.getBuffer(start, Math.min(data.length(), start + maxBufferSize)); + start += buf.length(); + buffer.add(new PendingWrite(buf, start < data.length() ? null : promise)); + } + // flush waiting reads, if any + for (var l = readsWaiting.poll(); l != null; l = readsWaiting.poll()) l.countDown(); + return promise.future(); + } + + @Override + public WriteToInputStream setWriteQueueMaxSize(int maxSize) { + this.maxSize = maxSize; + return this; + } + + public WriteToInputStream setMaxChunkSize(int maxSize) { + maxBufferSize = maxSize; + return this; + } + + @Override + public boolean writeQueueFull() { + if (buffer.size() < maxSize) + return false; + everFull.compareAndSet(false, true); + return true; + } + + @Override + synchronized public int read() throws IOException { + while (true) { + while (!buffer.isEmpty() && buffer.peek().shouldDiscard()) buffer.poll(); + if (!buffer.isEmpty()) + return buffer.peek().readNext(); + // set latch to signal we are waiting + var latch = new CountDownLatch(1); + readsWaiting.add(latch); + if (buffer.isEmpty()) + try { + latch.await(); + } catch (InterruptedException e) { + throw new IOException("Failed to wait for data", e); + } + // now try to read again + } + } + + /* InputStream stuff */ + + @Override + synchronized public int read(byte[] b, int off, int len) throws IOException { + if (b.length < off + len) // sanity first + return 0; + // we are going to be lazy here and not read more than one pending write, even if there are more available. The contract allows for that + while (!buffer.isEmpty() && buffer.peek().shouldDiscard()) buffer.poll(); + if (!buffer.isEmpty()) + return buffer.peek().read(b, off, len); + // we still wait if there is no data, but let read() implement the blocking + int val = read(); + if (val < 0) + return val; + b[off] = (byte) (val & 0xFF); + return 1 + Math.max(0, read(b, off + 1, len - 1)); + } + + @Override + public int available() throws IOException { + return buffer.stream().map(PendingWrite::available).reduce(0, (i, a) -> i += a); + } + + @Override + public void close() throws IOException { + super.close(); + closed = true; // mark us closed, so that additional writes are NOPed + // if we have any buffered data, flush it and trigger the write completions + while (!buffer.isEmpty()) + buffer.poll().completion.tryComplete(); + // see if we need to call the drain handler to drain upstream + if (everFull.compareAndSet(true, false)) + context.runOnContext(drainHandler::handle); + } + + private class PendingWrite { + Buffer data; + Promise completion; + int position = 0; + + private PendingWrite(Buffer data, Promise completion) { + this.data = data; + this.completion = Objects.requireNonNullElse(completion, Promise.promise()); + } + + public boolean shouldDiscard() { + if (data != null && position >= data.length()) { + completion.tryComplete(); + if (everFull.compareAndSet(true, false)) + context.runOnContext(drainHandler::handle); + return true; + } + return false; + } + + public int available() { + return data == null ? 0 : data.length(); + } + + public int readNext() { + if (data == null) { + this.completion.tryComplete(); + return -1; + } + int val = 0xFF & data.getByte(position++); // get byte's bitwise value, which is what InputStream#read() is supposed to return + if (position > data.length()) + completion.tryComplete(); + return val; + } + + public int read(byte[] b, int off, int len) { + if (data == null || position >= data.length()) { + completion.tryComplete(); + return data == null ? -1 : 0; + } + int max = Math.min(len, data.length() - position); + data.getBytes(position, position + max, b, off); + position += max; + return max; + } + } +} \ No newline at end of file diff --git a/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java b/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java index 4561dec..fd9efb0 100644 --- a/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java +++ b/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java @@ -34,12 +34,7 @@ private TestBufferWriteStream(Buffer output, Handler> resultHa public boolean writeQueueFull() { return false; } - - @Override - public void write(Buffer data, Handler> handler) { - write(data).onComplete(handler); - } - + @Override public Future write(Buffer data) { output.appendBuffer(data); @@ -55,14 +50,14 @@ public WriteStream setWriteQueueMaxSize(int maxSize) { public WriteStream exceptionHandler(Handler handler) { return this; } - - @Override - public void end(Handler> handler) { - resultHandler.handle(null); - handler.handle(Future.succeededFuture()); - } - - @Override + + @Override + public Future end() { + resultHandler.handle(null); + return Future.succeededFuture(); + } + + @Override public WriteStream drainHandler(Handler handler) { return this; } diff --git a/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java b/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java index b998292..e68f17c 100644 --- a/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java +++ b/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java @@ -1,16 +1,5 @@ package io.cloudonix.vertx.javaio; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.*; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.lang.System.Logger; -import java.lang.System.Logger.Level; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; @@ -19,227 +8,234 @@ import io.vertx.core.streams.ReadStream; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; @ExtendWith(VertxExtension.class) class WriteToInputStreamTest { - - private final static Logger log = System.getLogger(WriteToInputStream.class.getName()); - - @Test - void test(Vertx vertx, VertxTestContext ctx) throws IOException, InterruptedException { - log.log(Level.INFO, "Starting test"); - var text = "hello world"; - var sink = new ByteArrayOutputStream(); - var cp = ctx.checkpoint(); - var stream = new WriteToInputStream(vertx); - (new ReadStream() { - private Handler endHandler; - private Handler handler; - - @Override - public ReadStream exceptionHandler(Handler handler) { - return this; - } - - @Override - public ReadStream handler(Handler handler) { - this.handler = handler; - return this; - } - - @Override - public ReadStream pause() { - return this; - } - - @Override - public ReadStream resume() { - vertx.setTimer(100, i -> { - handler.handle(Buffer.buffer(text)); - endHandler.handle(null); - }); - return this; - } - - @Override - public ReadStream fetch(long amount) { - return this; - } - - @Override - public ReadStream endHandler(Handler endHandler) { - this.endHandler = endHandler; - return this; - } - }).pipeTo(stream).onFailure(ctx::failNow); - vertx.executeBlocking(p -> { - try { - stream.transferTo(sink); - stream.close(); - sink.close(); - p.complete(sink.toByteArray()); - } catch (IOException e) { - p.fail(e); - } - }).onComplete(ctx.succeeding(res -> { - assertThat(res, is(equalTo(text.getBytes()))); - cp.flag(); - })); - } - - @Test - void testLargeTransfer(Vertx vertx, VertxTestContext ctx) throws IOException, InterruptedException { - log.log(Level.INFO, "Starting testLargeTransfer"); - var text = "hello world"; - int count = 10000; - var sink = new ByteArrayOutputStream(); - var cp = ctx.checkpoint(); - var stream = new WriteToInputStream(vertx); - (new ReadStream() { - private int remaining = count; - private Handler endHandler; - private Handler handler; - - @Override - public ReadStream exceptionHandler(Handler handler) { - return this; - } - - @Override - public ReadStream handler(Handler handler) { - this.handler = handler; - return this; - } - - @Override - public ReadStream pause() { - return this; - } - - private void transfer() { - if (remaining-- > 0) { - handler.handle(Buffer.buffer(text)); - vertx.setTimer(1, i -> transfer()); - } else - endHandler.handle(null); - } - - @Override - public ReadStream resume() { - vertx.setTimer(100, i -> transfer()); - return this; - } - - @Override - public ReadStream fetch(long amount) { - return this; - } - - @Override - public ReadStream endHandler(Handler endHandler) { - this.endHandler = endHandler; - return this; - } - }).pipeTo(stream).onSuccess(v -> { - }).onFailure(ctx::failNow); - vertx.executeBlocking(p -> { - try { - stream.transferTo(sink); - stream.close(); - sink.close(); - p.complete(sink.toByteArray()); - } catch (IOException e) { - p.fail(e); - } - }).onComplete(ctx.succeeding(res -> { - var bres = Buffer.buffer(res); - var test = text.getBytes(); - for (int i = 0; i < count; i++) { - assertThat(bres.getBuffer(i * test.length, (i+1) * test.length).getBytes(), is(equalTo(test))); - } - cp.flag(); - })); - } - - @Test - public void testConvert(Vertx vertx, VertxTestContext ctx) throws IOException { - log.log(Level.INFO, "Starting testConvert"); - var sink = new ByteArrayOutputStream(); - var result = Promise.promise(); - var os = new WriteToInputStream(vertx); - os.wrap(sink).onComplete(result); - os.end(Buffer.buffer("hello world")); - result.future() - .map(__ -> { - log.log(Level.INFO, "Testing output stream result..."); - assertThat(sink.toByteArray(), is(equalTo("hello world".getBytes()))); - return null; - }) - .onComplete(ctx.succeedingThenComplete()) - .onComplete(__ -> ctx.verify(os::close)); - } - - @Test - public void testReChunkedWrites(Vertx vertx, VertxTestContext ctx) throws IOException { - log.log(Level.INFO, "Starting testReChunkedWrites"); - var data = "hello world, this is a longish text which will be chunks"; - var sink = new ByteArrayOutputStream(); - var result = Promise.promise(); - var os = new WriteToInputStream(vertx); - os.setMaxChunkSize(10).wrap(sink).onComplete(result); - os.end(Buffer.buffer(data)); - result.future() - .map(__ -> { - log.log(Level.INFO, "Testing output stream result..."); - assertThat(sink.toByteArray(), is(equalTo(data.getBytes()))); - return null; - }) - .onComplete(ctx.succeedingThenComplete()) - .onComplete(__ -> ctx.verify(os::close)); - } - - @Test - public void testSingleByte(Vertx vertx, VertxTestContext ctx) throws IOException { - log.log(Level.INFO, "Starting testSingleByte"); - var data = new byte[] { 0x31 }; - var sink = new byte[4]; - var is = new WriteToInputStream(vertx); - vertx.runOnContext(__ -> { - Future.succeededFuture() - .compose(___ -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { } - return is.end(Buffer.buffer(data)); - }) - .onComplete(ctx.succeedingThenComplete()); - }); - var read = is.read(sink, 0, sink.length); - assertThat(sink, is(equalTo(new byte[] { 0x31, 0x00, 0x00, 0x00 }))); - assertThat(read, is(equalTo(1))); - is.close(); - } - - @Test - public void testTwoBytes(Vertx vertx, VertxTestContext ctx) throws IOException { - log.log(Level.INFO, "Starting testTwoBytes"); - var data = new byte[] { 0x31, 0x32 }; - var sink = new byte[4]; - var is = new WriteToInputStream(vertx); - vertx.runOnContext(__ -> { - Future.succeededFuture() - .compose(___ -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { } - return is.end(Buffer.buffer(data)); - }) - .onComplete(ctx.succeedingThenComplete()); - }); - var read = is.read(sink, 0, sink.length); - assertThat(sink, is(equalTo(new byte[] { 0x31, 0x32, 0x00, 0x00 }))); - assertThat(read, is(equalTo(2))); - is.close(); - } + + private final static Logger log = System.getLogger(WriteToInputStream.class.getName()); + + @Test + void test(Vertx vertx, VertxTestContext ctx) throws IOException, InterruptedException { + log.log(Level.INFO, "Starting test"); + var text = "hello world"; + var sink = new ByteArrayOutputStream(); + var cp = ctx.checkpoint(); + var stream = new WriteToInputStream(vertx); + (new ReadStream() { + private Handler endHandler; + private Handler handler; + + @Override + public ReadStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public ReadStream handler(Handler handler) { + this.handler = handler; + return this; + } + + @Override + public ReadStream pause() { + return this; + } + + @Override + public ReadStream resume() { + vertx.setTimer(100, i -> { + handler.handle(Buffer.buffer(text)); + endHandler.handle(null); + }); + return this; + } + + @Override + public ReadStream fetch(long amount) { + return this; + } + + @Override + public ReadStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + }).pipeTo(stream).onFailure(ctx::failNow); + vertx.executeBlocking(() -> { + stream.transferTo(sink); + stream.close(); + sink.close(); + return sink.toByteArray(); + }) + .onComplete(ctx.succeeding(res -> { + assertThat(res, is(equalTo(text.getBytes()))); + cp.flag(); + })); + } + + @Test + void testLargeTransfer(Vertx vertx, VertxTestContext ctx) throws IOException, InterruptedException { + log.log(Level.INFO, "Starting testLargeTransfer"); + var text = "hello world"; + int count = 10000; + var sink = new ByteArrayOutputStream(); + var cp = ctx.checkpoint(); + var stream = new WriteToInputStream(vertx); + (new ReadStream() { + private int remaining = count; + private Handler endHandler; + private Handler handler; + + @Override + public ReadStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public ReadStream handler(Handler handler) { + this.handler = handler; + return this; + } + + @Override + public ReadStream pause() { + return this; + } + + private void transfer() { + if (remaining-- > 0) { + handler.handle(Buffer.buffer(text)); + vertx.setTimer(1, i -> transfer()); + } else + endHandler.handle(null); + } + + @Override + public ReadStream resume() { + vertx.setTimer(100, i -> transfer()); + return this; + } + + @Override + public ReadStream fetch(long amount) { + return this; + } + + @Override + public ReadStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + }).pipeTo(stream).onSuccess(v -> { + }).onFailure(ctx::failNow); + vertx.executeBlocking(() -> { + stream.transferTo(sink); + stream.close(); + sink.close(); + return sink.toByteArray(); + }) + .onComplete(ctx.succeeding(res -> { + var bres = Buffer.buffer(res); + var test = text.getBytes(); + for (int i = 0; i < count; i++) { + assertThat(bres.getBuffer(i * test.length, (i + 1) * test.length).getBytes(), is(equalTo(test))); + } + cp.flag(); + })); + } + + @Test + public void testConvert(Vertx vertx, VertxTestContext ctx) throws IOException { + log.log(Level.INFO, "Starting testConvert"); + var sink = new ByteArrayOutputStream(); + var result = Promise.promise(); + var os = new WriteToInputStream(vertx); + os.wrap(sink).onComplete(result); + os.end(Buffer.buffer("hello world")); + result.future() + .map(__ -> { + log.log(Level.INFO, "Testing output stream result..."); + assertThat(sink.toByteArray(), is(equalTo("hello world".getBytes()))); + return null; + }) + .onComplete(ctx.succeedingThenComplete()) + .onComplete(__ -> ctx.verify(os::close)); + } + + @Test + public void testReChunkedWrites(Vertx vertx, VertxTestContext ctx) throws IOException { + log.log(Level.INFO, "Starting testReChunkedWrites"); + var data = "hello world, this is a longish text which will be chunks"; + var sink = new ByteArrayOutputStream(); + var result = Promise.promise(); + var os = new WriteToInputStream(vertx); + os.setMaxChunkSize(10).wrap(sink).onComplete(result); + os.end(Buffer.buffer(data)); + result.future() + .map(__ -> { + log.log(Level.INFO, "Testing output stream result..."); + assertThat(sink.toByteArray(), is(equalTo(data.getBytes()))); + return null; + }) + .onComplete(ctx.succeedingThenComplete()) + .onComplete(__ -> ctx.verify(os::close)); + } + + @Test + public void testSingleByte(Vertx vertx, VertxTestContext ctx) throws IOException { + log.log(Level.INFO, "Starting testSingleByte"); + var data = new byte[]{0x31}; + var sink = new byte[4]; + var is = new WriteToInputStream(vertx); + vertx.runOnContext(__ -> { + Future.succeededFuture() + .compose(___ -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + return is.end(Buffer.buffer(data)); + }) + .onComplete(ctx.succeedingThenComplete()); + }); + var read = is.read(sink, 0, sink.length); + assertThat(sink, is(equalTo(new byte[]{0x31, 0x00, 0x00, 0x00}))); + assertThat(read, is(equalTo(1))); + is.close(); + } + + @Test + public void testTwoBytes(Vertx vertx, VertxTestContext ctx) throws IOException { + log.log(Level.INFO, "Starting testTwoBytes"); + var data = new byte[]{0x31, 0x32}; + var sink = new byte[4]; + var is = new WriteToInputStream(vertx); + vertx.runOnContext(__ -> { + Future.succeededFuture() + .compose(___ -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + return is.end(Buffer.buffer(data)); + }) + .onComplete(ctx.succeedingThenComplete()); + }); + var read = is.read(sink, 0, sink.length); + assertThat(sink, is(equalTo(new byte[]{0x31, 0x32, 0x00, 0x00}))); + assertThat(read, is(equalTo(2))); + is.close(); + } } From 50811c215041805f5e3c0afa2290cda8640e5947 Mon Sep 17 00:00:00 2001 From: Sinri Edogawa Date: Fri, 31 Oct 2025 19:49:53 +0800 Subject: [PATCH 2/2] Replace spaces with tabs in source and test files. --- .../vertx/javaio/OutputToReadStream.java | 351 +++++++------- .../vertx/javaio/WriteToInputStream.java | 395 ++++++++------- .../vertx/javaio/OutputToReadStreamTest.java | 12 +- .../vertx/javaio/WriteToInputStreamTest.java | 448 +++++++++--------- 4 files changed, 596 insertions(+), 610 deletions(-) diff --git a/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java b/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java index e3661db..765d3b3 100644 --- a/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java +++ b/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java @@ -43,185 +43,176 @@ */ public class OutputToReadStream extends OutputStream implements ReadStream { - private AtomicReference paused = new AtomicReference<>(new CountDownLatch(0)); - private boolean closed; - private AtomicLong demand = new AtomicLong(0); - private Handler endHandler = v -> { - }; - private Handler dataHandler = d -> { - }; - private Handler errorHandler = t -> { - }; - private Context context; - - public OutputToReadStream(Vertx vertx) { - context = vertx.getOrCreateContext(); - } - - /** - * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. - *

- * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the - * Java blocking IO and will try to propagate IO failures to the returned {@link Future}. - *

- * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then - * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams - * will be safely closed and will issue leak warnings. - * - * @param source InputStream to drain - * @param sink WriteStream to pipe data to - * @return a Future that will succeed when all the data have been written and the streams closed, or fail if an - * {@link IOException} has occurred - */ - public Future pipeFromInput(InputStream source, WriteStream sink) { - Promise promise = Promise.promise(); - pipeTo(sink).onComplete(promise); - ForkJoinPool.commonPool().submit(() -> { - try (final InputStream is = source; final OutputStream os = this) { - source.transferTo(this); - } catch (IOException e) { - promise.tryFail(e); - } - }); - return promise.future(); - } - - /** - * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. - *

- * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the - * Java blocking IO and will try to propagate IO failures to the returned {@link Future} - *

- * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then - * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams - * will be safely closed and will issue leak warnings. - * - * @param source InputStream to drain - * @param sink WriteStream to pipe data to - * @param handler a handler that will be called when all the data have been written and the streams closed, - * or if an {@link IOException} has occurred. - */ - public void pipeFromInput(InputStream source, WriteStream sink, Handler> handler) { - pipeFromInput(source, sink).onComplete(handler); - } - - /** - * Propagate an out-of-band error (likely generated or handled by the code that feeds the output stream) to the end - * of the - * read stream to let them know that the result is not going to be good. - * - * @param t error to be propagated down the stream - */ - public void sendError(Throwable t) { - context.executeBlocking((Callable) () -> { - errorHandler.handle(t); - return null; - }); - } - - /* ReadStream stuff */ - - @Override - public OutputToReadStream exceptionHandler(Handler handler) { - // we are usually not propagating exceptions as OutputStream has no mechanism for propagating exceptions down, - // except when wrapping an input stream, in which case we can forward InputStream read errors to the error handler. - errorHandler = Objects.requireNonNullElse(handler, t -> { - }); - return this; - } - - @Override - public OutputToReadStream handler(Handler handler) { - this.dataHandler = Objects.requireNonNullElse(handler, d -> { - }); - return this; - } - - @Override - public OutputToReadStream pause() { - paused.getAndSet(new CountDownLatch(1)).countDown(); - return this; - } - - @Override - public OutputToReadStream resume() { - paused.getAndSet(new CountDownLatch(0)).countDown(); - return this; - } - - @Override - public OutputToReadStream fetch(long amount) { - resume(); - demand.addAndGet(amount); - return null; - } - - @Override - public OutputToReadStream endHandler(Handler endHandler) { - this.endHandler = Objects.requireNonNullElse(endHandler, v -> { - }); - return this; - } - - /* OutputStream stuff */ - - @Override - synchronized public void write(int b) throws IOException { - if (closed) - throw new IOException("OutputStream is closed"); - try { - paused.get().await(); - } catch (InterruptedException e) { - throw new IOException("Interrupted a wait for stream to resume", e); - } - push(Buffer.buffer(1).appendByte((byte) (b & 0xFF))); - } - - @Override - synchronized public void write(byte[] b, int off, int len) throws IOException { - if (closed) - throw new IOException("OutputStream is closed"); - try { - paused.get().await(); - } catch (InterruptedException e) { - throw new IOException("Interrupted a wait for stream to resume", e); - } - push(Buffer.buffer(len - off).appendBytes(b, off, len)); - } - - @Override - synchronized public void close() throws IOException { - if (closed) - return; - closed = true; - try { - paused.get().await(); - } catch (InterruptedException e) { - throw new IOException("Interrupted a wait for stream to resume", e); - } - push(null); - } - - /* Internal implementation */ - - private void push(Buffer data) { - var awaiter = new CountDownLatch(1); - context.runOnContext(v -> { - try { - if (data == null) // end of stream - endHandler.handle(null); - else - dataHandler.handle(data); - } catch (Throwable t) { - errorHandler.handle(t); - } finally { - awaiter.countDown(); - } - }); - try { - awaiter.await(); - } catch (InterruptedException e) { - } - } + private AtomicReference paused = new AtomicReference<>(new CountDownLatch(0)); + private boolean closed; + private AtomicLong demand = new AtomicLong(0); + private Handler endHandler = v -> {}; + private Handler dataHandler = d -> {}; + private Handler errorHandler = t -> {}; + private Context context; + + public OutputToReadStream(Vertx vertx) { + context = vertx.getOrCreateContext(); + } + + /** + * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. + * + * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the + * Java blocking IO and will try to propagate IO failures to the returned {@link Future}. + * + * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then + * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams + * will be safely closed and will issue leak warnings. + * + * @param source InputStream to drain + * @param sink WriteStream to pipe data to + * @return a Future that will succeed when all the data have been written and the streams closed, or fail if an + * {@link IOException} has occurred + */ + public Future pipeFromInput(InputStream source, WriteStream sink) { + Promise promise = Promise.promise(); + pipeTo(sink).onComplete(promise); + ForkJoinPool.commonPool().submit(() -> { + try (final InputStream is = source; final OutputStream os = this) { + source.transferTo(this); + } catch (IOException e) { + promise.tryFail(e); + } + }); + return promise.future(); + } + + /** + * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. + * + * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the + * Java blocking IO and will try to propagate IO failures to the returned {@link Future} + * + * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then + * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams + * will be safely closed and will issue leak warnings. + * + * @param source InputStream to drain + * @param sink WriteStream to pipe data to + * @param handler a handler that will be called when all the data have been written and the streams closed, + * or if an {@link IOException} has occurred. + */ + public void pipeFromInput(InputStream source, WriteStream sink, Handler> handler) { + pipeFromInput(source, sink).onComplete(handler); + } + + /** + * Propagate an out-of-band error (likely generated or handled by the code that feeds the output stream) to the end of the + * read stream to let them know that the result is not going to be good. + * @param t error to be propagated down the stream + */ + public void sendError(Throwable t) { + context.executeBlocking((Callable) () -> { + errorHandler.handle(t); + return null; + }); + } + + /* ReadStream stuff */ + + @Override + public OutputToReadStream exceptionHandler(Handler handler) { + // we are usually not propagating exceptions as OutputStream has no mechanism for propagating exceptions down, + // except when wrapping an input stream, in which case we can forward InputStream read errors to the error handler. + errorHandler = Objects.requireNonNullElse(handler, t -> {}); + return this; + } + + @Override + public OutputToReadStream handler(Handler handler) { + this.dataHandler = Objects.requireNonNullElse(handler, d -> {}); + return this; + } + + @Override + public OutputToReadStream pause() { + paused.getAndSet(new CountDownLatch(1)).countDown(); + return this; + } + + @Override + public OutputToReadStream resume() { + paused.getAndSet(new CountDownLatch(0)).countDown(); + return this; + } + + @Override + public OutputToReadStream fetch(long amount) { + resume(); + demand.addAndGet(amount); + return null; + } + + @Override + public OutputToReadStream endHandler(Handler endHandler) { + this.endHandler = Objects.requireNonNullElse(endHandler, v -> {}); + return this; + } + + /* OutputStream stuff */ + + @Override + synchronized public void write(int b) throws IOException { + if (closed) + throw new IOException("OutputStream is closed"); + try { + paused.get().await(); + } catch (InterruptedException e) { + throw new IOException("Interrupted a wait for stream to resume", e); + } + push(Buffer.buffer(1).appendByte((byte) (b & 0xFF))); + } + + @Override + synchronized public void write(byte[] b, int off, int len) throws IOException { + if (closed) + throw new IOException("OutputStream is closed"); + try { + paused.get().await(); + } catch (InterruptedException e) { + throw new IOException("Interrupted a wait for stream to resume", e); + } + push(Buffer.buffer(len - off).appendBytes(b, off, len)); + } + + @Override + synchronized public void close() throws IOException { + if (closed) + return; + closed = true; + try { + paused.get().await(); + } catch (InterruptedException e) { + throw new IOException("Interrupted a wait for stream to resume", e); + } + push(null); + } + + /* Internal implementation */ + + private void push(Buffer data) { + var awaiter = new CountDownLatch(1); + context.runOnContext(v -> { + try { + if (data == null) // end of stream + endHandler.handle(null); + else + dataHandler.handle(data); + } catch (Throwable t) { + errorHandler.handle(t); + } finally { + awaiter.countDown(); + } + }); + try { + awaiter.await(); + } catch (InterruptedException e) { } + } } diff --git a/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java b/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java index e454205..52228d4 100644 --- a/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java +++ b/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java @@ -27,202 +27,199 @@ * * @author guss77 */ -public class WriteToInputStream extends InputStream implements WriteStream { - - private Handler drainHandler = __ -> { - }; - private Handler errorHandler = t -> { - }; - private volatile int maxSize = 1000; - private volatile int maxBufferSize = Integer.MAX_VALUE; - private ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); - private AtomicBoolean everFull = new AtomicBoolean(); - private volatile boolean closed = false; - private ConcurrentLinkedQueue readsWaiting = new ConcurrentLinkedQueue<>(); - private Context context; - - public WriteToInputStream(Vertx vertx) { - context = vertx.getOrCreateContext(); - } - - /** - * Runs {@link #transferTo(OutputStream)} as a blocking task in the Vert.x context - * - * @param os Output stream to transfer the contents of this {@linkplain WriteStream} to - * @return a promise that will resolve when the {@linkplain WriteStream} has been closed - * and all data successfully transfered to the {@linkplain OutputStream}, or reject if - * there was either a {@linkplain WriteStream} error or an {@linkplain IOException} - */ - public Future wrap(OutputStream os) { - return context.executeBlocking(() -> { - try (os) { - transferTo(os); - return null; - } - }) - .onFailure(t -> { - if (errorHandler != null) - errorHandler.handle(t); - }) - .mapEmpty(); - } - - @Override - public WriteToInputStream drainHandler(Handler handler) { - this.drainHandler = handler; - return this; - } - - /* WriteStream stuff */ - - @Override - public Future end() { - return write(null); - } - - @Override - public WriteToInputStream exceptionHandler(Handler handler) { - // we don't have a way to propagate errors as we don't actually handle writing out and InputStream provides no feedback mechanism. - errorHandler = handler; - return this; - } - - @Override - public Future write(Buffer data) { - if (closed) - // accept all data and discard it, as unlike JDK9 Flow, we have no way to tell upstream to stop sending data - return Future.succeededFuture(); - var promise = Promise.promise(); - if (data == null) // end of stream - buffer.add(new PendingWrite(null, promise)); - else - for (int start = 0; start < data.length(); ) { - var buf = data.length() < maxBufferSize ? data : data.getBuffer(start, Math.min(data.length(), start + maxBufferSize)); - start += buf.length(); - buffer.add(new PendingWrite(buf, start < data.length() ? null : promise)); - } - // flush waiting reads, if any - for (var l = readsWaiting.poll(); l != null; l = readsWaiting.poll()) l.countDown(); - return promise.future(); - } - - @Override - public WriteToInputStream setWriteQueueMaxSize(int maxSize) { - this.maxSize = maxSize; - return this; - } - - public WriteToInputStream setMaxChunkSize(int maxSize) { - maxBufferSize = maxSize; - return this; - } - - @Override - public boolean writeQueueFull() { - if (buffer.size() < maxSize) - return false; - everFull.compareAndSet(false, true); - return true; - } - - @Override - synchronized public int read() throws IOException { - while (true) { - while (!buffer.isEmpty() && buffer.peek().shouldDiscard()) buffer.poll(); - if (!buffer.isEmpty()) - return buffer.peek().readNext(); - // set latch to signal we are waiting - var latch = new CountDownLatch(1); - readsWaiting.add(latch); - if (buffer.isEmpty()) - try { - latch.await(); - } catch (InterruptedException e) { - throw new IOException("Failed to wait for data", e); - } - // now try to read again - } - } - - /* InputStream stuff */ - - @Override - synchronized public int read(byte[] b, int off, int len) throws IOException { - if (b.length < off + len) // sanity first - return 0; - // we are going to be lazy here and not read more than one pending write, even if there are more available. The contract allows for that - while (!buffer.isEmpty() && buffer.peek().shouldDiscard()) buffer.poll(); - if (!buffer.isEmpty()) - return buffer.peek().read(b, off, len); - // we still wait if there is no data, but let read() implement the blocking - int val = read(); - if (val < 0) - return val; - b[off] = (byte) (val & 0xFF); - return 1 + Math.max(0, read(b, off + 1, len - 1)); - } - - @Override - public int available() throws IOException { - return buffer.stream().map(PendingWrite::available).reduce(0, (i, a) -> i += a); - } - - @Override - public void close() throws IOException { - super.close(); - closed = true; // mark us closed, so that additional writes are NOPed - // if we have any buffered data, flush it and trigger the write completions - while (!buffer.isEmpty()) - buffer.poll().completion.tryComplete(); - // see if we need to call the drain handler to drain upstream - if (everFull.compareAndSet(true, false)) - context.runOnContext(drainHandler::handle); - } - - private class PendingWrite { - Buffer data; - Promise completion; - int position = 0; - - private PendingWrite(Buffer data, Promise completion) { - this.data = data; - this.completion = Objects.requireNonNullElse(completion, Promise.promise()); - } - - public boolean shouldDiscard() { - if (data != null && position >= data.length()) { - completion.tryComplete(); - if (everFull.compareAndSet(true, false)) - context.runOnContext(drainHandler::handle); - return true; - } - return false; - } - - public int available() { - return data == null ? 0 : data.length(); - } - - public int readNext() { - if (data == null) { - this.completion.tryComplete(); - return -1; - } - int val = 0xFF & data.getByte(position++); // get byte's bitwise value, which is what InputStream#read() is supposed to return - if (position > data.length()) - completion.tryComplete(); - return val; - } - - public int read(byte[] b, int off, int len) { - if (data == null || position >= data.length()) { - completion.tryComplete(); - return data == null ? -1 : 0; - } - int max = Math.min(len, data.length() - position); - data.getBytes(position, position + max, b, off); - position += max; - return max; - } - } -} \ No newline at end of file +public class WriteToInputStream extends InputStream implements WriteStream{ + + private class PendingWrite { + Buffer data; + Promise completion; + int position = 0; + + private PendingWrite(Buffer data, Promise completion) { + this.data = data; + this.completion = Objects.requireNonNullElse(completion, Promise.promise()); + } + + public boolean shouldDiscard() { + if (data != null && position >= data.length()) { + completion.tryComplete(); + if (everFull.compareAndSet(true, false)) + context.runOnContext(drainHandler::handle); + return true; + } + return false; + } + + public int available() { + return data == null ? 0 : data.length(); + } + + public int readNext() { + if (data == null) { + this.completion.tryComplete(); + return -1; + } + int val = 0xFF & data.getByte(position++); // get byte's bitwise value, which is what InputStream#read() is supposed to return + if (position > data.length()) + completion.tryComplete(); + return val; + } + + public int read(byte[] b, int off, int len) { + if (data == null || position >= data.length()) { + completion.tryComplete(); + return data == null ? -1 : 0; + } + int max = Math.min(len, data.length() - position); + data.getBytes(position, position + max, b, off); + position += max; + return max; + } + } + + private Handler drainHandler = __ -> {}; + private Handler errorHandler = t -> {}; + private volatile int maxSize = 1000; + private volatile int maxBufferSize = Integer.MAX_VALUE; + private ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); + private AtomicBoolean everFull = new AtomicBoolean(); + private volatile boolean closed = false; + private ConcurrentLinkedQueue readsWaiting = new ConcurrentLinkedQueue<>(); + private Context context; + + public WriteToInputStream(Vertx vertx) { + context = vertx.getOrCreateContext(); + } + + /** + * Runs {@link #transferTo(OutputStream)} as a blocking task in the Vert.x context + * @param os Output stream to transfer the contents of this {@linkplain WriteStream} to + * @return a promise that will resolve when the {@linkplain WriteStream} has been closed + * and all data successfully transfered to the {@linkplain OutputStream}, or reject if + * there was either a {@linkplain WriteStream} error or an {@linkplain IOException} + */ + public Future wrap(OutputStream os) { + return context.executeBlocking(() -> { + try (os) { + transferTo(os); + return null; + } + }) + .onFailure(t -> { + if (errorHandler != null) + errorHandler.handle(t); + }) + .mapEmpty(); + } + + /* WriteStream stuff */ + + @Override + public WriteToInputStream drainHandler(Handler handler) { + this.drainHandler = handler; + return this; + } + + @Override + public Future end() { + return write(null); + } + + @Override + public WriteToInputStream exceptionHandler(Handler handler) { + // we don't have a way to propagate errors as we don't actually handle writing out and InputStream provides no feedback mechanism. + errorHandler = handler; + return this; + } + + @Override + public Future write(Buffer data) { + if (closed) + // accept all data and discard it, as unlike JDK9 Flow, we have no way to tell upstream to stop sending data + return Future.succeededFuture(); + var promise = Promise.promise(); + if (data == null) // end of stream + buffer.add(new PendingWrite(null, promise)); + else + for (int start = 0; start < data.length();) { + var buf = data.length() < maxBufferSize ? data : data.getBuffer(start, Math.min(data.length(), start + maxBufferSize)); + start += buf.length(); + buffer.add(new PendingWrite(buf, start < data.length() ? null : promise)); + } + // flush waiting reads, if any + for (var l = readsWaiting.poll(); l != null; l = readsWaiting.poll()) l.countDown(); + return promise.future(); + } + + @Override + public WriteToInputStream setWriteQueueMaxSize(int maxSize) { + this.maxSize = maxSize; + return this; + } + + public WriteToInputStream setMaxChunkSize(int maxSize) { + maxBufferSize = maxSize; + return this; + } + + @Override + public boolean writeQueueFull() { + if (buffer.size() < maxSize) + return false; + everFull.compareAndSet(false, true); + return true; + } + + /* InputStream stuff */ + + @Override + synchronized public int read() throws IOException { + while (true) { + while (!buffer.isEmpty() && buffer.peek().shouldDiscard()) buffer.poll(); + if (!buffer.isEmpty()) + return buffer.peek().readNext(); + // set latch to signal we are waiting + var latch = new CountDownLatch(1); + readsWaiting.add(latch); + if (buffer.isEmpty()) + try { + latch.await(); + } catch (InterruptedException e) { + throw new IOException("Failed to wait for data", e); + } + // now try to read again + } + } + + @Override + synchronized public int read(byte[] b, int off, int len) throws IOException { + if (b.length < off + len) // sanity first + return 0; + // we are going to be lazy here and not read more than one pending write, even if there are more available. The contract allows for that + while (!buffer.isEmpty() && buffer.peek().shouldDiscard()) buffer.poll(); + if (!buffer.isEmpty()) + return buffer.peek().read(b, off, len); + // we still wait if there is no data, but let read() implement the blocking + int val = read(); + if (val < 0) + return val; + b[off] = (byte) (val & 0xFF); + return 1 + Math.max(0, read(b, off + 1, len - 1)); + } + + @Override + public int available() throws IOException { + return buffer.stream().map(PendingWrite::available).reduce(0, (i,a) -> i += a); + } + + @Override + public void close() throws IOException { + super.close(); + closed = true; // mark us closed, so that additional writes are NOPed + // if we have any buffered data, flush it and trigger the write completions + while (!buffer.isEmpty()) + buffer.poll().completion.tryComplete(); + // see if we need to call the drain handler to drain upstream + if (everFull.compareAndSet(true, false)) + context.runOnContext(drainHandler::handle); + } +} diff --git a/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java b/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java index fd9efb0..a442dd3 100644 --- a/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java +++ b/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java @@ -51,13 +51,13 @@ public WriteStream exceptionHandler(Handler handler) { return this; } - @Override - public Future end() { - resultHandler.handle(null); - return Future.succeededFuture(); - } + @Override + public Future end() { + resultHandler.handle(null); + return Future.succeededFuture(); + } - @Override + @Override public WriteStream drainHandler(Handler handler) { return this; } diff --git a/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java b/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java index e68f17c..694a67f 100644 --- a/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java +++ b/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java @@ -1,5 +1,16 @@ package io.cloudonix.vertx.javaio; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.*; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; @@ -8,234 +19,221 @@ import io.vertx.core.streams.ReadStream; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.lang.System.Logger; -import java.lang.System.Logger.Level; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; @ExtendWith(VertxExtension.class) class WriteToInputStreamTest { - private final static Logger log = System.getLogger(WriteToInputStream.class.getName()); - - @Test - void test(Vertx vertx, VertxTestContext ctx) throws IOException, InterruptedException { - log.log(Level.INFO, "Starting test"); - var text = "hello world"; - var sink = new ByteArrayOutputStream(); - var cp = ctx.checkpoint(); - var stream = new WriteToInputStream(vertx); - (new ReadStream() { - private Handler endHandler; - private Handler handler; - - @Override - public ReadStream exceptionHandler(Handler handler) { - return this; - } - - @Override - public ReadStream handler(Handler handler) { - this.handler = handler; - return this; - } - - @Override - public ReadStream pause() { - return this; - } - - @Override - public ReadStream resume() { - vertx.setTimer(100, i -> { - handler.handle(Buffer.buffer(text)); - endHandler.handle(null); - }); - return this; - } - - @Override - public ReadStream fetch(long amount) { - return this; - } - - @Override - public ReadStream endHandler(Handler endHandler) { - this.endHandler = endHandler; - return this; - } - }).pipeTo(stream).onFailure(ctx::failNow); - vertx.executeBlocking(() -> { - stream.transferTo(sink); - stream.close(); - sink.close(); - return sink.toByteArray(); - }) - .onComplete(ctx.succeeding(res -> { - assertThat(res, is(equalTo(text.getBytes()))); - cp.flag(); - })); - } - - @Test - void testLargeTransfer(Vertx vertx, VertxTestContext ctx) throws IOException, InterruptedException { - log.log(Level.INFO, "Starting testLargeTransfer"); - var text = "hello world"; - int count = 10000; - var sink = new ByteArrayOutputStream(); - var cp = ctx.checkpoint(); - var stream = new WriteToInputStream(vertx); - (new ReadStream() { - private int remaining = count; - private Handler endHandler; - private Handler handler; - - @Override - public ReadStream exceptionHandler(Handler handler) { - return this; - } - - @Override - public ReadStream handler(Handler handler) { - this.handler = handler; - return this; - } - - @Override - public ReadStream pause() { - return this; - } - - private void transfer() { - if (remaining-- > 0) { - handler.handle(Buffer.buffer(text)); - vertx.setTimer(1, i -> transfer()); - } else - endHandler.handle(null); - } - - @Override - public ReadStream resume() { - vertx.setTimer(100, i -> transfer()); - return this; - } - - @Override - public ReadStream fetch(long amount) { - return this; - } - - @Override - public ReadStream endHandler(Handler endHandler) { - this.endHandler = endHandler; - return this; - } - }).pipeTo(stream).onSuccess(v -> { - }).onFailure(ctx::failNow); - vertx.executeBlocking(() -> { - stream.transferTo(sink); - stream.close(); - sink.close(); - return sink.toByteArray(); - }) - .onComplete(ctx.succeeding(res -> { - var bres = Buffer.buffer(res); - var test = text.getBytes(); - for (int i = 0; i < count; i++) { - assertThat(bres.getBuffer(i * test.length, (i + 1) * test.length).getBytes(), is(equalTo(test))); - } - cp.flag(); - })); - } - - @Test - public void testConvert(Vertx vertx, VertxTestContext ctx) throws IOException { - log.log(Level.INFO, "Starting testConvert"); - var sink = new ByteArrayOutputStream(); - var result = Promise.promise(); - var os = new WriteToInputStream(vertx); - os.wrap(sink).onComplete(result); - os.end(Buffer.buffer("hello world")); - result.future() - .map(__ -> { - log.log(Level.INFO, "Testing output stream result..."); - assertThat(sink.toByteArray(), is(equalTo("hello world".getBytes()))); - return null; - }) - .onComplete(ctx.succeedingThenComplete()) - .onComplete(__ -> ctx.verify(os::close)); - } - - @Test - public void testReChunkedWrites(Vertx vertx, VertxTestContext ctx) throws IOException { - log.log(Level.INFO, "Starting testReChunkedWrites"); - var data = "hello world, this is a longish text which will be chunks"; - var sink = new ByteArrayOutputStream(); - var result = Promise.promise(); - var os = new WriteToInputStream(vertx); - os.setMaxChunkSize(10).wrap(sink).onComplete(result); - os.end(Buffer.buffer(data)); - result.future() - .map(__ -> { - log.log(Level.INFO, "Testing output stream result..."); - assertThat(sink.toByteArray(), is(equalTo(data.getBytes()))); - return null; - }) - .onComplete(ctx.succeedingThenComplete()) - .onComplete(__ -> ctx.verify(os::close)); - } - - @Test - public void testSingleByte(Vertx vertx, VertxTestContext ctx) throws IOException { - log.log(Level.INFO, "Starting testSingleByte"); - var data = new byte[]{0x31}; - var sink = new byte[4]; - var is = new WriteToInputStream(vertx); - vertx.runOnContext(__ -> { - Future.succeededFuture() - .compose(___ -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - return is.end(Buffer.buffer(data)); - }) - .onComplete(ctx.succeedingThenComplete()); - }); - var read = is.read(sink, 0, sink.length); - assertThat(sink, is(equalTo(new byte[]{0x31, 0x00, 0x00, 0x00}))); - assertThat(read, is(equalTo(1))); - is.close(); - } - - @Test - public void testTwoBytes(Vertx vertx, VertxTestContext ctx) throws IOException { - log.log(Level.INFO, "Starting testTwoBytes"); - var data = new byte[]{0x31, 0x32}; - var sink = new byte[4]; - var is = new WriteToInputStream(vertx); - vertx.runOnContext(__ -> { - Future.succeededFuture() - .compose(___ -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - return is.end(Buffer.buffer(data)); - }) - .onComplete(ctx.succeedingThenComplete()); - }); - var read = is.read(sink, 0, sink.length); - assertThat(sink, is(equalTo(new byte[]{0x31, 0x32, 0x00, 0x00}))); - assertThat(read, is(equalTo(2))); - is.close(); - } + private final static Logger log = System.getLogger(WriteToInputStream.class.getName()); + + @Test + void test(Vertx vertx, VertxTestContext ctx) throws IOException, InterruptedException { + log.log(Level.INFO, "Starting test"); + var text = "hello world"; + var sink = new ByteArrayOutputStream(); + var cp = ctx.checkpoint(); + var stream = new WriteToInputStream(vertx); + (new ReadStream() { + private Handler endHandler; + private Handler handler; + + @Override + public ReadStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public ReadStream handler(Handler handler) { + this.handler = handler; + return this; + } + + @Override + public ReadStream pause() { + return this; + } + + @Override + public ReadStream resume() { + vertx.setTimer(100, i -> { + handler.handle(Buffer.buffer(text)); + endHandler.handle(null); + }); + return this; + } + + @Override + public ReadStream fetch(long amount) { + return this; + } + + @Override + public ReadStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + }).pipeTo(stream).onFailure(ctx::failNow); + vertx.executeBlocking(() -> { + stream.transferTo(sink); + stream.close(); + sink.close(); + return sink.toByteArray(); + }) + .onComplete(ctx.succeeding(res -> { + assertThat(res, is(equalTo(text.getBytes()))); + cp.flag(); + })); + } + + @Test + void testLargeTransfer(Vertx vertx, VertxTestContext ctx) throws IOException, InterruptedException { + log.log(Level.INFO, "Starting testLargeTransfer"); + var text = "hello world"; + int count = 10000; + var sink = new ByteArrayOutputStream(); + var cp = ctx.checkpoint(); + var stream = new WriteToInputStream(vertx); + (new ReadStream() { + private int remaining = count; + private Handler endHandler; + private Handler handler; + + @Override + public ReadStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public ReadStream handler(Handler handler) { + this.handler = handler; + return this; + } + + @Override + public ReadStream pause() { + return this; + } + + private void transfer() { + if (remaining-- > 0) { + handler.handle(Buffer.buffer(text)); + vertx.setTimer(1, i -> transfer()); + } else + endHandler.handle(null); + } + + @Override + public ReadStream resume() { + vertx.setTimer(100, i -> transfer()); + return this; + } + + @Override + public ReadStream fetch(long amount) { + return this; + } + + @Override + public ReadStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + }).pipeTo(stream).onSuccess(v -> { + }).onFailure(ctx::failNow); + vertx.executeBlocking(() -> { + stream.transferTo(sink); + stream.close(); + sink.close(); + return sink.toByteArray(); + }) + .onComplete(ctx.succeeding(res -> { + var bres = Buffer.buffer(res); + var test = text.getBytes(); + for (int i = 0; i < count; i++) { + assertThat(bres.getBuffer(i * test.length, (i + 1) * test.length).getBytes(), is(equalTo(test))); + } + cp.flag(); + })); + } + + @Test + public void testConvert(Vertx vertx, VertxTestContext ctx) throws IOException { + log.log(Level.INFO, "Starting testConvert"); + var sink = new ByteArrayOutputStream(); + var result = Promise.promise(); + var os = new WriteToInputStream(vertx); + os.wrap(sink).onComplete(result); + os.end(Buffer.buffer("hello world")); + result.future() + .map(__ -> { + log.log(Level.INFO, "Testing output stream result..."); + assertThat(sink.toByteArray(), is(equalTo("hello world".getBytes()))); + return null; + }) + .onComplete(ctx.succeedingThenComplete()) + .onComplete(__ -> ctx.verify(os::close)); + } + + @Test + public void testReChunkedWrites(Vertx vertx, VertxTestContext ctx) throws IOException { + log.log(Level.INFO, "Starting testReChunkedWrites"); + var data = "hello world, this is a longish text which will be chunks"; + var sink = new ByteArrayOutputStream(); + var result = Promise.promise(); + var os = new WriteToInputStream(vertx); + os.setMaxChunkSize(10).wrap(sink).onComplete(result); + os.end(Buffer.buffer(data)); + result.future() + .map(__ -> { + log.log(Level.INFO, "Testing output stream result..."); + assertThat(sink.toByteArray(), is(equalTo(data.getBytes()))); + return null; + }) + .onComplete(ctx.succeedingThenComplete()) + .onComplete(__ -> ctx.verify(os::close)); + } + + @Test + public void testSingleByte(Vertx vertx, VertxTestContext ctx) throws IOException { + log.log(Level.INFO, "Starting testSingleByte"); + var data = new byte[] { 0x31 }; + var sink = new byte[4]; + var is = new WriteToInputStream(vertx); + vertx.runOnContext(__ -> { + Future.succeededFuture() + .compose(___ -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { } + return is.end(Buffer.buffer(data)); + }) + .onComplete(ctx.succeedingThenComplete()); + }); + var read = is.read(sink, 0, sink.length); + assertThat(sink, is(equalTo(new byte[] { 0x31, 0x00, 0x00, 0x00 }))); + assertThat(read, is(equalTo(1))); + is.close(); + } + + @Test + public void testTwoBytes(Vertx vertx, VertxTestContext ctx) throws IOException { + log.log(Level.INFO, "Starting testTwoBytes"); + var data = new byte[] { 0x31, 0x32 }; + var sink = new byte[4]; + var is = new WriteToInputStream(vertx); + vertx.runOnContext(__ -> { + Future.succeededFuture() + .compose(___ -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { } + return is.end(Buffer.buffer(data)); + }) + .onComplete(ctx.succeedingThenComplete()); + }); + var read = is.read(sink, 0, sink.length); + assertThat(sink, is(equalTo(new byte[] { 0x31, 0x32, 0x00, 0x00 }))); + assertThat(read, is(equalTo(2))); + is.close(); + } }