33import java .io .IOException ;
44import java .io .InputStream ;
55import java .io .OutputStream ;
6+ import java .util .Objects ;
67import java .util .concurrent .CountDownLatch ;
8+ import java .util .concurrent .ForkJoinPool ;
79import java .util .concurrent .atomic .AtomicLong ;
810import java .util .concurrent .atomic .AtomicReference ;
911
12+ import io .vertx .core .AsyncResult ;
1013import io .vertx .core .Context ;
14+ import io .vertx .core .Future ;
1115import io .vertx .core .Handler ;
16+ import io .vertx .core .Promise ;
1217import io .vertx .core .Vertx ;
1318import io .vertx .core .buffer .Buffer ;
1419import io .vertx .core .streams .ReadStream ;
20+ import io .vertx .core .streams .WriteStream ;
1521
22+ /**
23+ * A conversion utility to help move data from a Java classic blocking IO to a Vert.x asynchronous Stream.
24+ *
25+ * Use this class to create an {@link OutputStream} that pushes data written to it to a {@link ReadStream}
26+ * API.
27+ *
28+ * The ReadStream handlers will be called on a Vert.x context, and the {@link #close()} method must be called
29+ * for the ReadStream end handler to be triggered.
30+ *
31+ * It is recommended to use this class in the context of a blocking try-with-resources block, to
32+ * ensure that streams are closed properly. For example:
33+ *
34+ * <tt><pre>
35+ * try (final OutputToReadStream os = new OutputToReadStream(vertx); final InputStream is = getInput()) {
36+ * os.pipeTo(someWriteStream);
37+ * is.transferTo(os);
38+ * }
39+ * </pre></tt>
40+ *
41+ * @author guss77
42+ */
1643public class OutputToReadStream extends OutputStream implements ReadStream <Buffer > {
1744
1845 private AtomicReference <CountDownLatch > paused = new AtomicReference <>(new CountDownLatch (0 ));
@@ -28,26 +55,50 @@ public OutputToReadStream(Vertx vertx) {
2855 }
2956
3057 /**
31- * Helper utility to "convert" a Java {@link InputStream} to a {@link ReadStream}<code><Buffer></code> .
58+ * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream} .
3259 *
33- * This method uses {@link InputStream#transferTo(OutputStream)}, and in addition it will attempt to close the
34- * InputStream once it is done transferring its data, and call this ReadStream's end handler if no error occurred .
60+ * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the
61+ * Java blocking IO and will try to propagate IO failures to the returned {@link Future} .
3562 *
36- * If an error has occurred, the end handler is not guaranteed to be called, and a "best effort" attempt will be made
37- * to close the input stream.
38- * @param is InputStream instance to wrap
39- * @return this instance, which will immediately start producing data from the specified input stream
63+ * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then
64+ * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams
65+ * will be safely closed and will issue leak warnings.
66+ *
67+ * @param source InputStream to drain
68+ * @param sink WriteStream to pipe data to
69+ * @return a Future that will succeed when all the data have been written and the streams closed, or fail if an
70+ * {@link IOException} has occurred
4071 */
41- public OutputToReadStream wrap (InputStream is ) {
42- context .executeBlocking (p -> {
43- try (is ) {
44- is .transferTo (this );
45- p .complete ();
46- } catch (Throwable e ) {
47- p .fail (e );
72+ public Future <Void > pipeFromInput (InputStream source , WriteStream <Buffer > sink ) {
73+ Promise <Void > promise = Promise .promise ();
74+ pipeTo (sink , promise );
75+ ForkJoinPool .commonPool ().submit (() -> {
76+ try (final InputStream is = source ; final OutputStream os = this ){
77+ source .transferTo (this );
78+ } catch (IOException e ) {
79+ promise .tryFail (e );
4880 }
49- }).onFailure (errorHandler ::handle ).onSuccess (v -> endHandler .handle (null ));
50- return this ;
81+ });
82+ return promise .future ();
83+ }
84+
85+ /**
86+ * Helper utility to pipe a Java {@link InputStream} to a {@link WriteStream}.
87+ *
88+ * This method is non-blocking and Vert.x context safe. It uses the common ForkJoinPool to perform the
89+ * Java blocking IO and will try to propagate IO failures to the returned {@link Future}
90+ *
91+ * This method uses {@link InputStream#transferTo(OutputStream)} to copy all the data, and will then
92+ * attempt to close both streams asynchronously. Some Java compilers might not detect that the streams
93+ * will be safely closed and will issue leak warnings.
94+ *
95+ * @param source InputStream to drain
96+ * @param sink WriteStream to pipe data to
97+ * @param handler a handler that will be called when all the data have been written and the streams closed,
98+ * or if an {@link IOException} has occurred.
99+ */
100+ public void pipeFromInput (InputStream source , WriteStream <Buffer > sink , Handler <AsyncResult <Void >> handler ) {
101+ pipeFromInput (source , sink ).onComplete (handler );
51102 }
52103
53104 /**
@@ -71,13 +122,13 @@ public void sendError(Throwable t) {
71122 public OutputToReadStream exceptionHandler (Handler <Throwable > handler ) {
72123 // we are usually not propagating exceptions as OutputStream has no mechanism for propagating exceptions down,
73124 // except when wrapping an input stream, in which case we can forward InputStream read errors to the error handler.
74- errorHandler = handler ;
125+ errorHandler = Objects . requireNonNullElse ( handler , t -> {}) ;
75126 return this ;
76127 }
77128
78129 @ Override
79130 public OutputToReadStream handler (Handler <Buffer > handler ) {
80- this .dataHandler = handler ;
131+ this .dataHandler = Objects . requireNonNullElse ( handler , d -> {}) ;
81132 return this ;
82133 }
83134
@@ -102,7 +153,7 @@ public OutputToReadStream fetch(long amount) {
102153
103154 @ Override
104155 public OutputToReadStream endHandler (Handler <Void > endHandler ) {
105- this .endHandler = endHandler ;
156+ this .endHandler = Objects . requireNonNullElse ( endHandler , v -> {}) ;
106157 return this ;
107158 }
108159
@@ -134,6 +185,8 @@ synchronized public void write(byte[] b, int off, int len) throws IOException {
134185
135186 @ Override
136187 synchronized public void close () throws IOException {
188+ if (closed )
189+ return ;
137190 closed = true ;
138191 try {
139192 paused .get ().await ();
@@ -155,10 +208,7 @@ private void push(Buffer data) {
155208 dataHandler .handle (data );
156209 awaiter .countDown ();
157210 } catch (Throwable t ) {
158- if (errorHandler != null )
159- errorHandler .handle (t );
160- else
161- System .err .println ("Unexpected exception in OutputToReadStream and no error handler: " + t );
211+ errorHandler .handle (t );
162212 }
163213 });
164214 try {
0 commit comments