diff --git a/pom.xml b/pom.xml index dad71a3..bb72461 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ Utilities for integrating Vert.x IO with Java IO - 4.1.8 + 5.0.5 11 11 11 diff --git a/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java b/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java index f2b3792..765d3b3 100644 --- a/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java +++ b/src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java @@ -4,6 +4,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Objects; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicLong; @@ -41,7 +42,7 @@ * @author guss77 */ public class OutputToReadStream extends OutputStream implements ReadStream { - + private AtomicReference paused = new AtomicReference<>(new CountDownLatch(0)); private boolean closed; private AtomicLong demand = new AtomicLong(0); @@ -49,21 +50,21 @@ public class OutputToReadStream extends OutputStream implements ReadStream dataHandler = d -> {}; private Handler errorHandler = t -> {}; private Context context; - + public OutputToReadStream(Vertx vertx) { context = vertx.getOrCreateContext(); } - + /** * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. - * + * * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the * Java blocking IO and will try to propagate IO failures to the returned {@link Future}. - * + * * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams * will be safely closed and will issue leak warnings. - * + * * @param source InputStream to drain * @param sink WriteStream to pipe data to * @return a Future that will succeed when all the data have been written and the streams closed, or fail if an @@ -71,9 +72,9 @@ public OutputToReadStream(Vertx vertx) { */ public Future pipeFromInput(InputStream source, WriteStream sink) { Promise promise = Promise.promise(); - pipeTo(sink, promise); + pipeTo(sink).onComplete(promise); ForkJoinPool.commonPool().submit(() -> { - try (final InputStream is = source; final OutputStream os = this){ + try (final InputStream is = source; final OutputStream os = this) { source.transferTo(this); } catch (IOException e) { promise.tryFail(e); @@ -81,17 +82,17 @@ public Future pipeFromInput(InputStream source, WriteStream sink) }); return promise.future(); } - + /** * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}. - * + * * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the * Java blocking IO and will try to propagate IO failures to the returned {@link Future} - * + * * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams * will be safely closed and will issue leak warnings. - * + * * @param source InputStream to drain * @param sink WriteStream to pipe data to * @param handler a handler that will be called when all the data have been written and the streams closed, @@ -100,24 +101,21 @@ public Future pipeFromInput(InputStream source, WriteStream sink) public void pipeFromInput(InputStream source, WriteStream sink, Handler> handler) { pipeFromInput(source, sink).onComplete(handler); } - + /** - * Propagate an out-of-band error (likely generated or handled by the code that feeds the output stream) to the end of the + * Propagate an out-of-band error (likely generated or handled by the code that feeds the output stream) to the end of the * read stream to let them know that the result is not going to be good. * @param t error to be propagated down the stream */ public void sendError(Throwable t) { - context.executeBlocking(p -> { - try { - errorHandler.handle(t); - } finally { - p.tryComplete(); - } + context.executeBlocking((Callable) () -> { + errorHandler.handle(t); + return null; }); } - + /* ReadStream stuff */ - + @Override public OutputToReadStream exceptionHandler(Handler handler) { // we are usually not propagating exceptions as OutputStream has no mechanism for propagating exceptions down, @@ -158,7 +156,7 @@ public OutputToReadStream endHandler(Handler endHandler) { } /* OutputStream stuff */ - + @Override synchronized public void write(int b) throws IOException { if (closed) @@ -170,7 +168,7 @@ synchronized public void write(int b) throws IOException { } push(Buffer.buffer(1).appendByte((byte) (b & 0xFF))); } - + @Override synchronized public void write(byte[] b, int off, int len) throws IOException { if (closed) @@ -195,9 +193,9 @@ synchronized public void close() throws IOException { } push(null); } - + /* Internal implementation */ - + private void push(Buffer data) { var awaiter = new CountDownLatch(1); context.runOnContext(v -> { @@ -216,5 +214,5 @@ private void push(Buffer data) { awaiter.await(); } catch (InterruptedException e) { } } - + } diff --git a/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java b/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java index 5412e79..52228d4 100644 --- a/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java +++ b/src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java @@ -28,17 +28,17 @@ * @author guss77 */ public class WriteToInputStream extends InputStream implements WriteStream{ - + private class PendingWrite { Buffer data; Promise completion; int position = 0; - + private PendingWrite(Buffer data, Promise completion) { this.data = data; this.completion = Objects.requireNonNullElse(completion, Promise.promise()); } - + public boolean shouldDiscard() { if (data != null && position >= data.length()) { completion.tryComplete(); @@ -48,11 +48,11 @@ public boolean shouldDiscard() { } return false; } - + public int available() { return data == null ? 0 : data.length(); } - + public int readNext() { if (data == null) { this.completion.tryComplete(); @@ -63,7 +63,7 @@ public int readNext() { completion.tryComplete(); return val; } - + public int read(byte[] b, int off, int len) { if (data == null || position >= data.length()) { completion.tryComplete(); @@ -89,7 +89,7 @@ public int read(byte[] b, int off, int len) { public WriteToInputStream(Vertx vertx) { context = vertx.getOrCreateContext(); } - + /** * Runs {@link #transferTo(OutputStream)} as a blocking task in the Vert.x context * @param os Output stream to transfer the contents of this {@linkplain WriteStream} to @@ -98,21 +98,21 @@ public WriteToInputStream(Vertx vertx) { * there was either a {@linkplain WriteStream} error or an {@linkplain IOException} */ public Future wrap(OutputStream os) { - return context.executeBlocking(p -> { - try (os) { - transferTo(os); - p.complete(); - } catch (Throwable t) { - p.fail(t); - } - }).onFailure(t -> { - if (errorHandler != null) - errorHandler.handle(t); - }).mapEmpty(); + return context.executeBlocking(() -> { + try (os) { + transferTo(os); + return null; + } + }) + .onFailure(t -> { + if (errorHandler != null) + errorHandler.handle(t); + }) + .mapEmpty(); } - + /* WriteStream stuff */ - + @Override public WriteToInputStream drainHandler(Handler handler) { this.drainHandler = handler; @@ -120,9 +120,8 @@ public WriteToInputStream drainHandler(Handler handler) { } @Override - public void end(Handler> handler) { - // signal end of stream by writing a null buffer - write(null, handler); + public Future end() { + return write(null); } @Override @@ -151,17 +150,12 @@ public Future write(Buffer data) { return promise.future(); } - @Override - public void write(Buffer data, Handler> handler) { - write(data).onComplete(handler); - } - @Override public WriteToInputStream setWriteQueueMaxSize(int maxSize) { this.maxSize = maxSize; return this; } - + public WriteToInputStream setMaxChunkSize(int maxSize) { maxBufferSize = maxSize; return this; @@ -176,7 +170,7 @@ public boolean writeQueueFull() { } /* InputStream stuff */ - + @Override synchronized public int read() throws IOException { while (true) { @@ -195,7 +189,7 @@ synchronized public int read() throws IOException { // now try to read again } } - + @Override synchronized public int read(byte[] b, int off, int len) throws IOException { if (b.length < off + len) // sanity first @@ -216,7 +210,7 @@ synchronized public int read(byte[] b, int off, int len) throws IOException { public int available() throws IOException { return buffer.stream().map(PendingWrite::available).reduce(0, (i,a) -> i += a); } - + @Override public void close() throws IOException { super.close(); diff --git a/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java b/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java index 4561dec..a442dd3 100644 --- a/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java +++ b/src/test/java/io/cloudonix/vertx/javaio/OutputToReadStreamTest.java @@ -34,12 +34,7 @@ private TestBufferWriteStream(Buffer output, Handler> resultHa public boolean writeQueueFull() { return false; } - - @Override - public void write(Buffer data, Handler> handler) { - write(data).onComplete(handler); - } - + @Override public Future write(Buffer data) { output.appendBuffer(data); @@ -55,13 +50,13 @@ public WriteStream setWriteQueueMaxSize(int maxSize) { public WriteStream exceptionHandler(Handler handler) { return this; } - + @Override - public void end(Handler> handler) { + public Future end() { resultHandler.handle(null); - handler.handle(Future.succeededFuture()); + return Future.succeededFuture(); } - + @Override public WriteStream drainHandler(Handler handler) { return this; diff --git a/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java b/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java index b998292..694a67f 100644 --- a/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java +++ b/src/test/java/io/cloudonix/vertx/javaio/WriteToInputStreamTest.java @@ -22,7 +22,7 @@ @ExtendWith(VertxExtension.class) class WriteToInputStreamTest { - + private final static Logger log = System.getLogger(WriteToInputStream.class.getName()); @Test @@ -72,19 +72,16 @@ public ReadStream endHandler(Handler endHandler) { return this; } }).pipeTo(stream).onFailure(ctx::failNow); - vertx.executeBlocking(p -> { - try { + vertx.executeBlocking(() -> { stream.transferTo(sink); stream.close(); sink.close(); - p.complete(sink.toByteArray()); - } catch (IOException e) { - p.fail(e); - } - }).onComplete(ctx.succeeding(res -> { - assertThat(res, is(equalTo(text.getBytes()))); - cp.flag(); - })); + return sink.toByteArray(); + }) + .onComplete(ctx.succeeding(res -> { + assertThat(res, is(equalTo(text.getBytes()))); + cp.flag(); + })); } @Test @@ -115,7 +112,7 @@ public ReadStream handler(Handler handler) { public ReadStream pause() { return this; } - + private void transfer() { if (remaining-- > 0) { handler.handle(Buffer.buffer(text)); @@ -142,25 +139,22 @@ public ReadStream endHandler(Handler endHandler) { } }).pipeTo(stream).onSuccess(v -> { }).onFailure(ctx::failNow); - vertx.executeBlocking(p -> { - try { + vertx.executeBlocking(() -> { stream.transferTo(sink); stream.close(); sink.close(); - p.complete(sink.toByteArray()); - } catch (IOException e) { - p.fail(e); - } - }).onComplete(ctx.succeeding(res -> { - var bres = Buffer.buffer(res); - var test = text.getBytes(); - for (int i = 0; i < count; i++) { - assertThat(bres.getBuffer(i * test.length, (i+1) * test.length).getBytes(), is(equalTo(test))); - } - cp.flag(); - })); + return sink.toByteArray(); + }) + .onComplete(ctx.succeeding(res -> { + var bres = Buffer.buffer(res); + var test = text.getBytes(); + for (int i = 0; i < count; i++) { + assertThat(bres.getBuffer(i * test.length, (i + 1) * test.length).getBytes(), is(equalTo(test))); + } + cp.flag(); + })); } - + @Test public void testConvert(Vertx vertx, VertxTestContext ctx) throws IOException { log.log(Level.INFO, "Starting testConvert"); @@ -178,7 +172,7 @@ public void testConvert(Vertx vertx, VertxTestContext ctx) throws IOException { .onComplete(ctx.succeedingThenComplete()) .onComplete(__ -> ctx.verify(os::close)); } - + @Test public void testReChunkedWrites(Vertx vertx, VertxTestContext ctx) throws IOException { log.log(Level.INFO, "Starting testReChunkedWrites"); @@ -197,7 +191,7 @@ public void testReChunkedWrites(Vertx vertx, VertxTestContext ctx) throws IOExce .onComplete(ctx.succeedingThenComplete()) .onComplete(__ -> ctx.verify(os::close)); } - + @Test public void testSingleByte(Vertx vertx, VertxTestContext ctx) throws IOException { log.log(Level.INFO, "Starting testSingleByte"); @@ -219,7 +213,7 @@ public void testSingleByte(Vertx vertx, VertxTestContext ctx) throws IOException assertThat(read, is(equalTo(1))); is.close(); } - + @Test public void testTwoBytes(Vertx vertx, VertxTestContext ctx) throws IOException { log.log(Level.INFO, "Starting testTwoBytes");