diff --git a/README.md b/README.md new file mode 100644 index 00000000..44f8e028 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +- [ ] Exercise 1 --> Data Streams +- [ ] Exercise 2 --> +- [ ] Exercise 3 --> +- [ ] Exercise 4 --> +- [ ] Exercise 5 --> +- [ ] Exercise 6 --> diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java index decda79f..5679b8cb 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java @@ -1,5 +1,8 @@ package io.javabrains.reactiveworkshop; +import java.util.List; +import java.util.stream.Collectors; + public class Exercise1 { public static void main(String[] args) { @@ -8,23 +11,49 @@ public static void main(String[] args) { // Print all numbers in the intNumbersStream stream // TODO: Write code here + StreamSources.intNumbersStream() + .forEach(System.out::println); // Print numbers from intNumbersStream that are less than 5 // TODO: Write code here + StreamSources.intNumbersStream() + .filter(number -> number < 5) + .forEach(System.out::println); // Print the second and third numbers in intNumbersStream that's greater than 5 // TODO: Write code here + StreamSources.intNumbersStream() + .filter(number -> number < 5) + .skip(1) + .limit(2) + .forEach(System.out::println); // Print the first number in intNumbersStream that's greater than 5. // If nothing is found, print -1 // TODO: Write code here + Integer value = StreamSources.intNumbersStream() + .filter(number -> number > 5) + .findFirst().orElse(-1); + System.out.println(value); // Print first names of all users in userStream // TODO: Write code here + StreamSources.userStream().forEach(user -> System.out.println(user.getFirstName())); // Print first names in userStream for users that have IDs from number stream // TODO: Write code here - + List numbers = StreamSources.intNumbersStream().collect(Collectors.toList()); + + StreamSources.userStream() + .filter(user -> numbers.contains(user.getId())) + .forEach(user -> System.out.println(user.getFirstName())); + + // OR + StreamSources.intNumbersStream() + .flatMap(id -> StreamSources.userStream() + .filter(user -> user.getId() == id)) + .map(user -> user.getFirstName()) + .forEach(System.out::println); } } diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java index 6ddbf309..dd61011b 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java @@ -10,9 +10,13 @@ public static void main(String[] args) throws IOException { // Print all numbers in the ReactiveSources.intNumbersFlux stream // TODO: Write code here + ReactiveSources.intNumbersFlux() + .subscribe(integer -> System.out.println(integer)); // Print all users in the ReactiveSources.userFlux stream // TODO: Write code here + ReactiveSources.userFlux() + .subscribe(user -> System.out.println(user)); System.out.println("Press a key to end"); System.in.read(); diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java index 96819d99..c7de5471 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java @@ -1,6 +1,7 @@ package io.javabrains.reactiveworkshop; import java.io.IOException; +import java.util.List; public class Exercise3 { @@ -11,6 +12,9 @@ public static void main(String[] args) throws IOException { // Get all numbers in the ReactiveSources.intNumbersFlux stream // into a List and print the list and its size // TODO: Write code here + List intNumbersList = ReactiveSources.intNumbersFlux().toStream().toList(); + //Takes 10s (because delay was set in the flux generation) + System.out.println("List of numbers: " + intNumbersList); System.out.println("Press a key to end"); diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java index f5e91411..802563f1 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java @@ -1,6 +1,7 @@ package io.javabrains.reactiveworkshop; import java.io.IOException; +import java.util.Optional; public class Exercise4 { @@ -8,11 +9,23 @@ public static void main(String[] args) throws IOException { // Use ReactiveSources.intNumberMono() - // Print the value from intNumberMono when it emits + // TODO: Write code here + // NOTE : non-blocking call + ReactiveSources.intNumberMono().subscribe(number -> System.out.println(number)); // Get the value from the Mono into an integer variable - // TODO: Write code here + // NOTE : blocking call + Integer number = ReactiveSources.intNumberMono().block(); + System.out.println("After blocking: " + number); + + // OR + + Optional numberOptional = ReactiveSources.intNumberMono().blockOptional(); + System.out.println("After blocking optional: " + (numberOptional.isPresent() ? numberOptional.get() : 0)); + + // NOTE : Mono can return a list + System.out.println("Press a key to end"); System.in.read(); diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java index 754e662f..eb5892f5 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java @@ -1,21 +1,50 @@ package io.javabrains.reactiveworkshop; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; + import java.io.IOException; public class Exercise5 { public static void main(String[] args) throws IOException { - // Use ReactiveSources.intNumberMono() and ReactiveSources.userMono() + // Use ReactiveSources.intNumberFlux() and ReactiveSources.userMono() // Subscribe to a flux using the error and completion hooks // TODO: Write code here + ReactiveSources.intNumbersFlux().subscribe( + number -> System.out.println(number), + error -> System.out.println(error.getMessage()), + () -> System.out.println("Completed !!") + ); + + ReactiveSources.userMono().subscribe( + user -> System.out.println(user), + error -> System.out.println(error.getMessage()), + () -> System.out.println("Completed !!") + ); + + // NOTE : The above two subscribers don't go one after the other, the order is unpredictable. // Subscribe to a flux using an implementation of BaseSubscriber // TODO: Write code here + ReactiveSources.intNumbersFlux().subscribe(new MySubscriber<>()); System.out.println("Press a key to end"); System.in.read(); } + static class MySubscriber extends BaseSubscriber { + public void hookOnSubscribe(Subscription subscription) { + System.out.println("Subscribe happened."); + request(1); + } + + public void hookOnNext(T value) { + System.out.println(value.toString() + " received."); + request(1); + } + } + } \ No newline at end of file