diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java index decda79f..55e8f0e9 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise1.java @@ -1,30 +1,46 @@ package io.javabrains.reactiveworkshop; -public class Exercise1 { - - public static void main(String[] args) { - - // Use StreamSources.intNumbersStream() and StreamSources.userStream() - - // Print all numbers in the intNumbersStream stream - // TODO: Write code here - - // Print numbers from intNumbersStream that are less than 5 - // TODO: Write code here +import java.util.List; - // Print the second and third numbers in intNumbersStream that's greater than 5 - // TODO: Write code here - - // Print the first number in intNumbersStream that's greater than 5. - // If nothing is found, print -1 - // TODO: Write code here - - // Print first names of all users in userStream - // TODO: Write code here - - // Print first names in userStream for users that have IDs from number stream - // TODO: Write code here - - } +public class Exercise1 { + public static void main(String[] args) { + + // Use StreamSources.intNumbersStream() and StreamSources.userStream() + + // Print all numbers in the intNumbersStream stream + StreamSources.intNumbersStream().forEach(System.out::println); + + // Print numbers from intNumbersStream that are less than 5 + StreamSources.intNumbersStream().filter(number -> number < 5).forEach(System.out::println); + + // Print the second and third numbers in intNumbersStream that's greater than 5 + StreamSources.intNumbersStream() + .filter(number -> number > 5) + .skip(1L) + .limit(2) + .forEach(System.out::println); + + // Print the first number in intNumbersStream that's greater than 5. + // If nothing is found, print -1 + Integer integer = + StreamSources.intNumbersStream().filter(number -> number > 5).findFirst().orElse(-1); + System.out.println(integer); + + // Print first names of all users in userStream + StreamSources.userStream().map(User::getFirstName).forEach(System.out::println); + + // Print first names in userStream for users that have IDs from number stream + // Solution-1 + StreamSources.intNumbersStream() + .flatMap(userId -> StreamSources.userStream().filter(user -> user.getId() == userId)) + .map(User::getFirstName) + .forEachOrdered(System.out::println); + // Solution-2 + List userIds = StreamSources.intNumbersStream().toList(); + StreamSources.userStream() + .filter(user -> userIds.contains(user.getId())) + .map(User::getFirstName) + .forEach(System.out::println); + } } diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java index 6ddbf309..b1839271 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise2.java @@ -1,21 +1,24 @@ package io.javabrains.reactiveworkshop; +import reactor.core.publisher.Flux; + import java.io.IOException; public class Exercise2 { - public static void main(String[] args) throws IOException { - - // Use ReactiveSources.intNumbersFlux() and ReactiveSources.userFlux() + public static void main(String[] args) throws IOException { - // Print all numbers in the ReactiveSources.intNumbersFlux stream - // TODO: Write code here + // Use ReactiveSources.intNumbersFlux() and ReactiveSources.userFlux() + Flux intNumbersFlux = ReactiveSources.intNumbersFlux(); + Flux userFlux = ReactiveSources.userFlux(); - // Print all users in the ReactiveSources.userFlux stream - // TODO: Write code here + // Print all numbers in the ReactiveSources.intNumbersFlux stream + intNumbersFlux.subscribe(System.out::println); - System.out.println("Press a key to end"); - System.in.read(); - } + // Print all users in the ReactiveSources.userFlux stream + userFlux.subscribe(System.out::println); + System.out.println("Press a key to end"); + System.in.read(); + } } diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java index 96819d99..55bff0e5 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise3.java @@ -1,20 +1,26 @@ package io.javabrains.reactiveworkshop; +import reactor.core.publisher.Flux; + import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; public class Exercise3 { - public static void main(String[] args) throws IOException { - - // Use ReactiveSources.intNumbersFlux() + public static void main(String[] args) throws IOException { - // Get all numbers in the ReactiveSources.intNumbersFlux stream - // into a List and print the list and its size - // TODO: Write code here + // Use ReactiveSources.intNumbersFlux() + Flux intNumbersFlux = ReactiveSources.intNumbersFlux(); + // Get all numbers in the ReactiveSources.intNumbersFlux stream + // into a List and print the list and its size + List numbers = intNumbersFlux.toStream().collect(Collectors.toList()); - System.out.println("Press a key to end"); - System.in.read(); - } + System.out.println(numbers); + System.out.println("Size: " + numbers.size()); + System.out.println("Press a key to end"); + System.in.read(); + } } diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java index f5e91411..e2fced15 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise4.java @@ -1,21 +1,24 @@ package io.javabrains.reactiveworkshop; +import reactor.core.publisher.Mono; + import java.io.IOException; public class Exercise4 { - public static void main(String[] args) throws IOException { - - // Use ReactiveSources.intNumberMono() + public static void main(String[] args) throws IOException { - // Print the value from intNumberMono when it emits - // TODO: Write code here + // Use ReactiveSources.intNumberMono() + Mono intNumberMono = ReactiveSources.intNumberMono(); - // Get the value from the Mono into an integer variable - // TODO: Write code here + // Print the value from intNumberMono when it emits + intNumberMono.subscribe(System.out::println); - System.out.println("Press a key to end"); - System.in.read(); - } + // Get the value from the Mono into an integer variable + Integer integer = intNumberMono.block(); + System.out.println("Integer value: " + integer); + System.out.println("Press a key to end"); + System.in.read(); + } } diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java index 754e662f..c1c5b989 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise5.java @@ -1,21 +1,45 @@ package io.javabrains.reactiveworkshop; +import reactor.core.publisher.BaseSubscriber; + import java.io.IOException; public class Exercise5 { - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws IOException { + + // Use ReactiveSources.intNumberMono() and ReactiveSources.userMono() + + // Subscribe to a flux using the error and completion hooks + ReactiveSources.intNumbersFlux() + .subscribe( + System.out::println, + Throwable::printStackTrace, + () -> System.out.println("Complete signal")); + + // Subscribe to a flux using an implementation of BaseSubscriber + BaseSubscriber baseSubscriber = + new BaseSubscriber<>() { - // Use ReactiveSources.intNumberMono() and ReactiveSources.userMono() + @Override + protected void hookOnNext(Integer value) { + System.out.println(value); + } - // Subscribe to a flux using the error and completion hooks - // TODO: Write code here + @Override + protected void hookOnError(Throwable throwable) { + System.out.println("Exception occurred: " + throwable); + } - // Subscribe to a flux using an implementation of BaseSubscriber - // TODO: Write code here + @Override + protected void hookOnComplete() { + System.out.println("Complete signal"); + } + }; - System.out.println("Press a key to end"); - System.in.read(); - } + ReactiveSources.intNumbersFlux().subscribe(baseSubscriber); -} \ No newline at end of file + System.out.println("Press a key to end"); + System.in.read(); + } +} diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise6.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise6.java index ebc668fb..3fd280bc 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise6.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise6.java @@ -1,23 +1,47 @@ package io.javabrains.reactiveworkshop; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import java.io.IOException; +import java.time.Duration; +import java.util.List; public class Exercise6 { - - public static void main(String[] args) throws IOException { - - // Use ReactiveSources.unresponsiveFlux() and ReactiveSources.unresponsiveMono() - - // Get the value from the Mono into a String variable but give up after 5 seconds - // TODO: Write code here - - // Get the value from unresponsiveFlux into a String list but give up after 5 seconds - // Come back and do this when you've learnt about operators! - // TODO: Write code here - - System.out.println("Press a key to end"); - System.in.read(); - } - + public static void main(String[] args) throws IOException { + + // Use ReactiveSources.unresponsiveFlux() and ReactiveSources.unresponsiveMono() + Flux unresponsiveFlux = ReactiveSources.unresponsiveFlux(); + Mono unresponsiveMono = ReactiveSources.unresponsiveMono(); + + // Get the value from the Mono into a String variable but give up after 5 seconds + String valueFromMono = + unresponsiveMono + .timeout(Duration.ofSeconds(5L)) + .doOnError( + e -> + System.out.println( + "Could not retrieve the value from unresponsiveMono within 5 seconds")) + .onErrorReturn("") + .block(); + System.out.println(valueFromMono); + + // Get the value from unresponsiveFlux into a String list but give up after 5 seconds + List valueFromFlux = + unresponsiveFlux + .timeout(Duration.ofSeconds(5L)) + .doOnError( + e -> + System.out.println( + "Could not retrieve the value from unresponsiveFlux within 5 seconds")) + .onErrorReturn("") + .collectList() + .block(); + + System.out.println(valueFromFlux); + + System.out.println("Press a key to end"); + System.in.read(); + } } diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise7.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise7.java index 89462325..fd5fdb8f 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise7.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise7.java @@ -1,37 +1,51 @@ package io.javabrains.reactiveworkshop; +import reactor.core.publisher.Flux; + import java.io.IOException; public class Exercise7 { + public static void main(String[] args) throws IOException { - public static void main(String[] args) throws IOException { - - // Use ReactiveSources.intNumberMono() and ReactiveSources.userMono() - - // Print all values from intNumbersFlux that's greater than 5 - // TODO: Write code here + // Use ReactiveSources.intNumberMono() and ReactiveSources.userMono() - // Print 10 times each value from intNumbersFlux that's greater than 5 - // TODO: Write code here + Flux intNumbersFlux = ReactiveSources.intNumbersFlux(); + Flux userFLux = ReactiveSources.userFlux(); + Flux intNumbersFluxWithRepeat = ReactiveSources.intNumbersFluxWithRepeat(); - // Print 10 times each value from intNumbersFlux for the first 3 numbers emitted that's greater than 5 - // TODO: Write code here + // Print all values from intNumbersFlux that's greater than 5 + intNumbersFlux.filter(number -> number > 5).subscribe(System.out::println); - // Print each value from intNumbersFlux that's greater than 20. Print -1 if no elements are found - // TODO: Write code here + // Print 10 times each value from intNumbersFlux that's greater than 5 + intNumbersFlux + .filter(number -> number > 5) + .map(number -> number * 10) + .subscribe(System.out::println); - // Switch ints from intNumbersFlux to the right user from userFlux - // TODO: Write code here + // Print 10 times each value from intNumbersFlux for the first 3 numbers emitted that's greater + // than 5 + intNumbersFlux + .filter(number -> number > 5) + .take(3) + .map(number -> number * 10) + .subscribe(System.out::println); + // Print each value from intNumbersFlux that's greater than 20. Print -1 if no elements are + // found + intNumbersFlux.filter(number -> number > 20).or(Flux.just(-1)).subscribe(System.out::println); - // Print only distinct numbers from intNumbersFluxWithRepeat - // TODO: Write code here + // Switch ints from intNumbersFlux to the right user from userFlux + intNumbersFlux + .flatMap(userId -> userFLux.filter(user -> user.getId() == userId)) + .subscribe(System.out::println); - // Print from intNumbersFluxWithRepeat excluding immediately repeating numbers - // TODO: Write code here + // Print only distinct numbers from intNumbersFluxWithRepeat + intNumbersFluxWithRepeat.distinct().subscribe(System.out::println); - System.out.println("Press a key to end"); - System.in.read(); - } + // Print from intNumbersFluxWithRepeat excluding immediately repeating numbers + intNumbersFluxWithRepeat.distinctUntilChanged().subscribe(System.out::println); + System.out.println("Press a key to end"); + System.in.read(); + } } diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise8.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise8.java index 8cab9738..ecd7e43a 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise8.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise8.java @@ -6,23 +6,26 @@ public class Exercise8 { + public static void main(String[] args) throws IOException { - public static void main(String[] args) throws IOException { + // Use ReactiveSources.intNumbersFluxWithException() + Flux intNumbersFluxWithException = ReactiveSources.intNumbersFluxWithException(); - // Use ReactiveSources.intNumbersFluxWithException() + // Print values from intNumbersFluxWithException and print a message when error happens + intNumbersFluxWithException.subscribe(System.out::println, System.out::println); - // Print values from intNumbersFluxWithException and print a message when error happens - // TODO: Write code here + // Print values from intNumbersFluxWithException and continue on errors + intNumbersFluxWithException + .onErrorContinue((exception, element) -> {}) + .subscribe(System.out::println, System.out::println); - // Print values from intNumbersFluxWithException and continue on errors - // TODO: Write code here - - // Print values from intNumbersFluxWithException and when errors - // happen, replace with a fallback sequence of -1 and -2 - // TODO: Write code here - - System.out.println("Press a key to end"); - System.in.read(); - } + // Print values from intNumbersFluxWithException and when errors + // happen, replace with a fallback sequence of -1 and -2 + intNumbersFluxWithException + .onErrorResume(a -> Flux.just(-1, -2)) + .subscribe(System.out::println, System.out::println); + System.out.println("Press a key to end"); + System.in.read(); + } } diff --git a/src/main/java/io/javabrains/reactiveworkshop/Exercise9.java b/src/main/java/io/javabrains/reactiveworkshop/Exercise9.java index 998bb925..276098fc 100644 --- a/src/main/java/io/javabrains/reactiveworkshop/Exercise9.java +++ b/src/main/java/io/javabrains/reactiveworkshop/Exercise9.java @@ -1,25 +1,33 @@ package io.javabrains.reactiveworkshop; +import reactor.core.publisher.Flux; + import java.io.IOException; public class Exercise9 { + public static void main(String[] args) throws IOException { - public static void main(String[] args) throws IOException { - - // Use ReactiveSources.intNumbersFlux() - - // Print size of intNumbersFlux after the last item returns - // TODO: Write code here + // Use ReactiveSources.intNumbersFlux() + Flux intNumbersFlux = ReactiveSources.intNumbersFlux(); - // Collect all items of intNumbersFlux into a single list and print it - // TODO: Write code here + // Print size of intNumbersFlux after the last item returns + intNumbersFlux + .count() + .subscribe(count -> System.out.println("Size: " + count)); - // Transform to a sequence of sums of adjacent two numbers - // TODO: Write code here + // Collect all items of intNumbersFlux into a single list and print it + intNumbersFlux + .collectList() + .subscribe(System.out::println); - System.out.println("Press a key to end"); - System.in.read(); - } + // Transform to a sequence of sums of adjacent two numbers + intNumbersFlux + .buffer(2) + .map(sequence -> sequence.stream().reduce(0, Integer::sum)) + .subscribe(System.out::println); + System.out.println("Press a key to end"); + System.in.read(); + } }