Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- [ ] Exercise 1 --> Data Streams
- [ ] Exercise 2 -->
- [ ] Exercise 3 -->
- [ ] Exercise 4 -->
- [ ] Exercise 5 -->
- [ ] Exercise 6 -->
31 changes: 30 additions & 1 deletion src/main/java/io/javabrains/reactiveworkshop/Exercise1.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.javabrains.reactiveworkshop;

import java.util.List;
import java.util.stream.Collectors;

public class Exercise1 {

public static void main(String[] args) {
Expand All @@ -8,23 +11,49 @@ public static void main(String[] args) {

// Print all numbers in the intNumbersStream stream
// TODO: Write code here
StreamSources.intNumbersStream()
.forEach(System.out::println);

// Print numbers from intNumbersStream that are less than 5
// TODO: Write code here
StreamSources.intNumbersStream()
.filter(number -> number < 5)
.forEach(System.out::println);

// Print the second and third numbers in intNumbersStream that's greater than 5
// TODO: Write code here
StreamSources.intNumbersStream()
.filter(number -> number < 5)
.skip(1)
.limit(2)
.forEach(System.out::println);

// Print the first number in intNumbersStream that's greater than 5.
// If nothing is found, print -1
// TODO: Write code here
Integer value = StreamSources.intNumbersStream()
.filter(number -> number > 5)
.findFirst().orElse(-1);
System.out.println(value);

// Print first names of all users in userStream
// TODO: Write code here
StreamSources.userStream().forEach(user -> System.out.println(user.getFirstName()));

// Print first names in userStream for users that have IDs from number stream
// TODO: Write code here

List<Integer> numbers = StreamSources.intNumbersStream().collect(Collectors.toList());

StreamSources.userStream()
.filter(user -> numbers.contains(user.getId()))
.forEach(user -> System.out.println(user.getFirstName()));

// OR
StreamSources.intNumbersStream()
.flatMap(id -> StreamSources.userStream()
.filter(user -> user.getId() == id))
.map(user -> user.getFirstName())
.forEach(System.out::println);
}

}
4 changes: 4 additions & 0 deletions src/main/java/io/javabrains/reactiveworkshop/Exercise2.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ public static void main(String[] args) throws IOException {

// Print all numbers in the ReactiveSources.intNumbersFlux stream
// TODO: Write code here
ReactiveSources.intNumbersFlux()
.subscribe(integer -> System.out.println(integer));

// Print all users in the ReactiveSources.userFlux stream
// TODO: Write code here
ReactiveSources.userFlux()
.subscribe(user -> System.out.println(user));

System.out.println("Press a key to end");
System.in.read();
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/javabrains/reactiveworkshop/Exercise3.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.javabrains.reactiveworkshop;

import java.io.IOException;
import java.util.List;

public class Exercise3 {

Expand All @@ -11,6 +12,9 @@ public static void main(String[] args) throws IOException {
// Get all numbers in the ReactiveSources.intNumbersFlux stream
// into a List and print the list and its size
// TODO: Write code here
List<Integer> intNumbersList = ReactiveSources.intNumbersFlux().toStream().toList();
//Takes 10s (because delay was set in the flux generation)
System.out.println("List of numbers: " + intNumbersList);


System.out.println("Press a key to end");
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/io/javabrains/reactiveworkshop/Exercise4.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
package io.javabrains.reactiveworkshop;

import java.io.IOException;
import java.util.Optional;

public class Exercise4 {

public static void main(String[] args) throws IOException {

// Use ReactiveSources.intNumberMono()

// Print the value from intNumberMono when it emits

// TODO: Write code here
// NOTE : non-blocking call
ReactiveSources.intNumberMono().subscribe(number -> System.out.println(number));

// Get the value from the Mono into an integer variable
// TODO: Write code here
// NOTE : blocking call
Integer number = ReactiveSources.intNumberMono().block();
System.out.println("After blocking: " + number);

// OR

Optional<Integer> numberOptional = ReactiveSources.intNumberMono().blockOptional();
System.out.println("After blocking optional: " + (numberOptional.isPresent() ? numberOptional.get() : 0));

// NOTE : Mono can return a list


System.out.println("Press a key to end");
System.in.read();
Expand Down
31 changes: 30 additions & 1 deletion src/main/java/io/javabrains/reactiveworkshop/Exercise5.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,50 @@
package io.javabrains.reactiveworkshop;

import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;

import java.io.IOException;

public class Exercise5 {

public static void main(String[] args) throws IOException {

// Use ReactiveSources.intNumberMono() and ReactiveSources.userMono()
// Use ReactiveSources.intNumberFlux() and ReactiveSources.userMono()

// Subscribe to a flux using the error and completion hooks
// TODO: Write code here
ReactiveSources.intNumbersFlux().subscribe(
number -> System.out.println(number),
error -> System.out.println(error.getMessage()),
() -> System.out.println("Completed !!")
);

ReactiveSources.userMono().subscribe(
user -> System.out.println(user),
error -> System.out.println(error.getMessage()),
() -> System.out.println("Completed !!")
);

// NOTE : The above two subscribers don't go one after the other, the order is unpredictable.

// Subscribe to a flux using an implementation of BaseSubscriber
// TODO: Write code here
ReactiveSources.intNumbersFlux().subscribe(new MySubscriber<>());

System.out.println("Press a key to end");
System.in.read();
}

static class MySubscriber<T> extends BaseSubscriber<T> {
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribe happened.");
request(1);
}

public void hookOnNext(T value) {
System.out.println(value.toString() + " received.");
request(1);
}
}

}