Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<description>Utilities for integrating Vert.x IO with Java IO</description>

<properties>
<vertx.version>4.1.8</vertx.version>
<vertx.version>5.0.5</vertx.version>
<maven.compiler.release>11</maven.compiler.release>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
Expand Down
52 changes: 25 additions & 27 deletions src/main/java/io/cloudonix/vertx/javaio/OutputToReadStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -41,57 +42,57 @@
* @author guss77
*/
public class OutputToReadStream extends OutputStream implements ReadStream<Buffer> {

private AtomicReference<CountDownLatch> paused = new AtomicReference<>(new CountDownLatch(0));
private boolean closed;
private AtomicLong demand = new AtomicLong(0);
private Handler<Void> endHandler = v -> {};
private Handler<Buffer> dataHandler = d -> {};
private Handler<Throwable> errorHandler = t -> {};
private Context context;

public OutputToReadStream(Vertx vertx) {
context = vertx.getOrCreateContext();
}

/**
* Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}.
*
*
* This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the
* Java blocking IO and will try to propagate IO failures to the returned {@link Future}.
*
*
* This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then
* attempt to close both streams asynchronously. Some Java compilers might not detect that the streams
* will be safely closed and will issue leak warnings.
*
*
* @param source InputStream to drain
* @param sink WriteStream to pipe data to
* @return a Future that will succeed when all the data have been written and the streams closed, or fail if an
* {@link IOException} has occurred
*/
public Future<Void> pipeFromInput(InputStream source, WriteStream<Buffer> sink) {
Promise<Void> promise = Promise.promise();
pipeTo(sink, promise);
pipeTo(sink).onComplete(promise);
ForkJoinPool.commonPool().submit(() -> {
try (final InputStream is = source; final OutputStream os = this){
try (final InputStream is = source; final OutputStream os = this) {
source.transferTo(this);
} catch (IOException e) {
promise.tryFail(e);
}
});
return promise.future();
}

/**
* Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}.
*
*
* This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the
* Java blocking IO and will try to propagate IO failures to the returned {@link Future}
*
*
* This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then
* attempt to close both streams asynchronously. Some Java compilers might not detect that the streams
* will be safely closed and will issue leak warnings.
*
*
* @param source InputStream to drain
* @param sink WriteStream to pipe data to
* @param handler a handler that will be called when all the data have been written and the streams closed,
Expand All @@ -100,24 +101,21 @@ public Future<Void> pipeFromInput(InputStream source, WriteStream<Buffer> sink)
public void pipeFromInput(InputStream source, WriteStream<Buffer> sink, Handler<AsyncResult<Void>> handler) {
pipeFromInput(source, sink).onComplete(handler);
}

/**
* Propagate an out-of-band error (likely generated or handled by the code that feeds the output stream) to the end of the
* Propagate an out-of-band error (likely generated or handled by the code that feeds the output stream) to the end of the
* read stream to let them know that the result is not going to be good.
* @param t error to be propagated down the stream
*/
public void sendError(Throwable t) {
context.executeBlocking(p -> {
try {
errorHandler.handle(t);
} finally {
p.tryComplete();
}
context.executeBlocking((Callable<Void>) () -> {
errorHandler.handle(t);
return null;
});
}

/* ReadStream stuff */

@Override
public OutputToReadStream exceptionHandler(Handler<Throwable> handler) {
// we are usually not propagating exceptions as OutputStream has no mechanism for propagating exceptions down,
Expand Down Expand Up @@ -158,7 +156,7 @@ public OutputToReadStream endHandler(Handler<Void> endHandler) {
}

/* OutputStream stuff */

@Override
synchronized public void write(int b) throws IOException {
if (closed)
Expand All @@ -170,7 +168,7 @@ synchronized public void write(int b) throws IOException {
}
push(Buffer.buffer(1).appendByte((byte) (b & 0xFF)));
}

@Override
synchronized public void write(byte[] b, int off, int len) throws IOException {
if (closed)
Expand All @@ -195,9 +193,9 @@ synchronized public void close() throws IOException {
}
push(null);
}

/* Internal implementation */

private void push(Buffer data) {
var awaiter = new CountDownLatch(1);
context.runOnContext(v -> {
Expand All @@ -216,5 +214,5 @@ private void push(Buffer data) {
awaiter.await();
} catch (InterruptedException e) { }
}

}
58 changes: 26 additions & 32 deletions src/main/java/io/cloudonix/vertx/javaio/WriteToInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
* @author guss77
*/
public class WriteToInputStream extends InputStream implements WriteStream<Buffer>{

private class PendingWrite {
Buffer data;
Promise<Void> completion;
int position = 0;

private PendingWrite(Buffer data, Promise<Void> completion) {
this.data = data;
this.completion = Objects.requireNonNullElse(completion, Promise.promise());
}

public boolean shouldDiscard() {
if (data != null && position >= data.length()) {
completion.tryComplete();
Expand All @@ -48,11 +48,11 @@ public boolean shouldDiscard() {
}
return false;
}

public int available() {
return data == null ? 0 : data.length();
}

public int readNext() {
if (data == null) {
this.completion.tryComplete();
Expand All @@ -63,7 +63,7 @@ public int readNext() {
completion.tryComplete();
return val;
}

public int read(byte[] b, int off, int len) {
if (data == null || position >= data.length()) {
completion.tryComplete();
Expand All @@ -89,7 +89,7 @@ public int read(byte[] b, int off, int len) {
public WriteToInputStream(Vertx vertx) {
context = vertx.getOrCreateContext();
}

/**
* Runs {@link #transferTo(OutputStream)} as a blocking task in the Vert.x context
* @param os Output stream to transfer the contents of this {@linkplain WriteStream} to
Expand All @@ -98,31 +98,30 @@ public WriteToInputStream(Vertx vertx) {
* there was either a {@linkplain WriteStream} error or an {@linkplain IOException}
*/
public Future<Void> wrap(OutputStream os) {
return context.executeBlocking(p -> {
try (os) {
transferTo(os);
p.complete();
} catch (Throwable t) {
p.fail(t);
}
}).onFailure(t -> {
if (errorHandler != null)
errorHandler.handle(t);
}).mapEmpty();
return context.executeBlocking(() -> {
try (os) {
transferTo(os);
return null;
}
})
.onFailure(t -> {
if (errorHandler != null)
errorHandler.handle(t);
})
.mapEmpty();
}

/* WriteStream stuff */

@Override
public WriteToInputStream drainHandler(Handler<Void> handler) {
this.drainHandler = handler;
return this;
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
// signal end of stream by writing a null buffer
write(null, handler);
public Future<Void> end() {
return write(null);
}

@Override
Expand Down Expand Up @@ -151,17 +150,12 @@ public Future<Void> write(Buffer data) {
return promise.future();
}

@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
write(data).onComplete(handler);
}

@Override
public WriteToInputStream setWriteQueueMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}

public WriteToInputStream setMaxChunkSize(int maxSize) {
maxBufferSize = maxSize;
return this;
Expand All @@ -176,7 +170,7 @@ public boolean writeQueueFull() {
}

/* InputStream stuff */

@Override
synchronized public int read() throws IOException {
while (true) {
Expand All @@ -195,7 +189,7 @@ synchronized public int read() throws IOException {
// now try to read again
}
}

@Override
synchronized public int read(byte[] b, int off, int len) throws IOException {
if (b.length < off + len) // sanity first
Expand All @@ -216,7 +210,7 @@ synchronized public int read(byte[] b, int off, int len) throws IOException {
public int available() throws IOException {
return buffer.stream().map(PendingWrite::available).reduce(0, (i,a) -> i += a);
}

@Override
public void close() throws IOException {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ private TestBufferWriteStream(Buffer output, Handler<AsyncResult<Void>> resultHa
public boolean writeQueueFull() {
return false;
}

@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
write(data).onComplete(handler);
}


@Override
public Future<Void> write(Buffer data) {
output.appendBuffer(data);
Expand All @@ -55,13 +50,13 @@ public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
public Future<Void> end() {
resultHandler.handle(null);
handler.handle(Future.succeededFuture());
return Future.succeededFuture();
}

@Override
public WriteStream<Buffer> drainHandler(Handler<Void> handler) {
return this;
Expand Down
Loading