From 0fd21125f6bf58aaa044a76b9e5011683e4655fa Mon Sep 17 00:00:00 2001 From: Z007JB2 Date: Fri, 25 Aug 2023 00:18:51 +0530 Subject: [PATCH 1/6] finished exercise 1 --- README.md | 6 +++++ .../reactiveworkshop/Exercise1.java | 23 +++++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 00000000..44f8e028 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +- [ ] Exercise 1 --> Data Streams +- [ ] Exercise 2 --> +- [ ] Exercise 3 --> +- [ ] Exercise 4 --> +- [ ] Exercise 5 --> +- [ ] Exercise 6 --> diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java index decda79f..8e8a2fc5 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java @@ -1,5 +1,8 @@ package io.javabrains.reactiveworkshop; +import java.util.List; +import java.util.stream.Collectors; + public class Exercise1 { public static void main(String[] args) { @@ -8,22 +11,42 @@ public static void main(String[] args) { // Print all numbers in the intNumbersStream stream // TODO: Write code here + StreamSources.intNumbersStream() + .forEach(System.out::println); // Print numbers from intNumbersStream that are less than 5 // TODO: Write code here + StreamSources.intNumbersStream() + .filter(number -> number < 5) + .forEach(System.out::println); // Print the second and third numbers in intNumbersStream that's greater than 5 // TODO: Write code here + StreamSources.intNumbersStream() + .filter(number -> number < 5) + .skip(1) + .limit(2) + .forEach(System.out::println); // Print the first number in intNumbersStream that's greater than 5. // If nothing is found, print -1 // TODO: Write code here + Integer value = StreamSources.intNumbersStream() + .filter(number -> number > 5) + .findFirst().orElse(-1); + System.out.println(value); // Print first names of all users in userStream // TODO: Write code here + StreamSources.userStream().forEach(user -> System.out.println(user.getFirstName())); // Print first names in userStream for users that have IDs from number stream // TODO: Write code here + List numbers = StreamSources.intNumbersStream().collect(Collectors.toList()); + + StreamSources.userStream() + .filter(user -> numbers.contains(user.getId())) + .forEach(user -> System.out.println(user.getFirstName())); } From 6f8253411bfcc563edbfc0220df164d57c6b7c6d Mon Sep 17 00:00:00 2001 From: Z007JB2 Date: Fri, 25 Aug 2023 18:21:25 +0530 Subject: [PATCH 2/6] exercise 1 done --- src/main/java/io/javabrains/reactiveworkshop/Exercise1.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java index 8e8a2fc5..5679b8cb 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java @@ -48,6 +48,12 @@ public static void main(String[] args) { .filter(user -> numbers.contains(user.getId())) .forEach(user -> System.out.println(user.getFirstName())); + // OR + StreamSources.intNumbersStream() + .flatMap(id -> StreamSources.userStream() + .filter(user -> user.getId() == id)) + .map(user -> user.getFirstName()) + .forEach(System.out::println); } } From c243e394c47227f0f9a79eb5cd804573ac50fd04 Mon Sep 17 00:00:00 2001 From: Z007JB2 Date: Tue, 29 Aug 2023 23:44:06 +0530 Subject: [PATCH 3/6] exercise 2 done --- src/main/java/io/javabrains/reactiveworkshop/Exercise2.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java index 6ddbf309..dd61011b 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java @@ -10,9 +10,13 @@ public static void main(String[] args) throws IOException { // Print all numbers in the ReactiveSources.intNumbersFlux stream // TODO: Write code here + ReactiveSources.intNumbersFlux() + .subscribe(integer -> System.out.println(integer)); // Print all users in the ReactiveSources.userFlux stream // TODO: Write code here + ReactiveSources.userFlux() + .subscribe(user -> System.out.println(user)); System.out.println("Press a key to end"); System.in.read(); From fa1e246255b081846d10e6a5a9a6e129f2fbb78e Mon Sep 17 00:00:00 2001 From: Z007JB2 Date: Tue, 29 Aug 2023 23:52:58 +0530 Subject: [PATCH 4/6] exercise 3 done --- src/main/java/io/javabrains/reactiveworkshop/Exercise3.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java index 96819d99..c7de5471 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java @@ -1,6 +1,7 @@ package io.javabrains.reactiveworkshop; import java.io.IOException; +import java.util.List; public class Exercise3 { @@ -11,6 +12,9 @@ public static void main(String[] args) throws IOException { // Get all numbers in the ReactiveSources.intNumbersFlux stream // into a List and print the list and its size // TODO: Write code here + List intNumbersList = ReactiveSources.intNumbersFlux().toStream().toList(); + //Takes 10s (because delay was set in the flux generation) + System.out.println("List of numbers: " + intNumbersList); System.out.println("Press a key to end"); From c3dc7d37242e68fdc2ebbd464fe889846f306c6e Mon Sep 17 00:00:00 2001 From: Z007JB2 Date: Sun, 3 Sep 2023 12:58:15 +0530 Subject: [PATCH 5/6] exercise 4 done - Mono --- .../javabrains/reactiveworkshop/Exercise4.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java index f5e91411..802563f1 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java @@ -1,6 +1,7 @@ package io.javabrains.reactiveworkshop; import java.io.IOException; +import java.util.Optional; public class Exercise4 { @@ -8,11 +9,23 @@ public static void main(String[] args) throws IOException { // Use ReactiveSources.intNumberMono() - // Print the value from intNumberMono when it emits + // TODO: Write code here + // NOTE : non-blocking call + ReactiveSources.intNumberMono().subscribe(number -> System.out.println(number)); // Get the value from the Mono into an integer variable - // TODO: Write code here + // NOTE : blocking call + Integer number = ReactiveSources.intNumberMono().block(); + System.out.println("After blocking: " + number); + + // OR + + Optional numberOptional = ReactiveSources.intNumberMono().blockOptional(); + System.out.println("After blocking optional: " + (numberOptional.isPresent() ? numberOptional.get() : 0)); + + // NOTE : Mono can return a list + System.out.println("Press a key to end"); System.in.read(); From 8d5805300d8033afa5eac3f8c2dcbf349cbfe118 Mon Sep 17 00:00:00 2001 From: Z007JB2 Date: Sun, 3 Sep 2023 14:57:42 +0530 Subject: [PATCH 6/6] exercise 5 done - subscriber method signature, base subscriber --- .../reactiveworkshop/Exercise5.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java index 754e662f..eb5892f5 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java @@ -1,21 +1,50 @@ package io.javabrains.reactiveworkshop; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; + import java.io.IOException; public class Exercise5 { public static void main(String[] args) throws IOException { - // Use ReactiveSources.intNumberMono() and ReactiveSources.userMono() + // Use ReactiveSources.intNumberFlux() and ReactiveSources.userMono() // Subscribe to a flux using the error and completion hooks // TODO: Write code here + ReactiveSources.intNumbersFlux().subscribe( + number -> System.out.println(number), + error -> System.out.println(error.getMessage()), + () -> System.out.println("Completed !!") + ); + + ReactiveSources.userMono().subscribe( + user -> System.out.println(user), + error -> System.out.println(error.getMessage()), + () -> System.out.println("Completed !!") + ); + + // NOTE : The above two subscribers don't go one after the other, the order is unpredictable. // Subscribe to a flux using an implementation of BaseSubscriber // TODO: Write code here + ReactiveSources.intNumbersFlux().subscribe(new MySubscriber<>()); System.out.println("Press a key to end"); System.in.read(); } + static class MySubscriber extends BaseSubscriber { + public void hookOnSubscribe(Subscription subscription) { + System.out.println("Subscribe happened."); + request(1); + } + + public void hookOnNext(T value) { + System.out.println(value.toString() + " received."); + request(1); + } + } + } \ No newline at end of file