Following Spring WebFlux Workshop
-
Hello World Router and Handler functions
Router Function
@Configuration public class QuoteRouter { @Bean public RouterFunction<ServerResponse> helloRouter(QuoteHandler handler) { return RouterFunctions .route() .GET("/hello-world", accept(TEXT_PLAIN), handler::hello) .build(); } }Handler Function
@Component public class QuoteHandler { public Mono<ServerResponse> hello(ServerRequest request) { return ServerResponse .ok() .contentType(TEXT_PLAIN) .syncBody("hello") .log(); } }
-
Echo Router and Handler functions
Router Function
@Bean public RouterFunction<ServerResponse> echoRouter(QuoteHandler handler) { return RouterFunctions .route() .POST("/echo", accept(TEXT_PLAIN), handler::echo) .build(); }Handler Function
public Mono<ServerResponse> echo(ServerRequest request) { return ServerResponse .ok() .body(request.bodyToMono(String.class), String.class); } -
Router Function
@Bean public RouterFunction<ServerResponse> quoteRouter(QuoteHandler handler) { return RouterFunctions .route() .GET("/quotes", accept(APPLICATION_STREAM_JSON), handler::streamQuotes) .build(); }Handler Function
@Component public class QuoteHandler { private Flux<Quote> quotes; public QuoteHandler(QuoteGenerator quoteGenerator) { quotes = quoteGenerator .fetchQuoteStream(Duration.ofMillis(200L)) .share(); } public Mono<ServerResponse> streamQuotes(ServerRequest request) { return ServerResponse .ok() .contentType(APPLICATION_STREAM_JSON) .body(quotes, Quote.class); }-
.share()creates a multicast hot flux, that is cancelled when no subscribers are left -
see also
publish()…-
.connect()-
connects regardless of subscribers
-
-
.autoConnect()-
autoconnects on first subscriber
-
-
.refCount()-
shorthand for
.share() -
connects on first subscriber
-
cancels when no subscribers left
-
-
-
-
Router Function
NoteObserve content type different from streaming endpoint @Bean public RouterFunction<ServerResponse> quoteLimitedVomitter(QuoteHandler handler) { return RouterFunctions .route() .GET("/quotes", accept(APPLICATION_JSON), handler::streamNQuotes) .build(); }Handler Function
public Mono<ServerResponse> streamNQuotes(ServerRequest request) { Long size = request .queryParam("size") .map(Long::valueOf) .orElse(10L); return ServerResponse .ok() .contentType(APPLICATION_JSON) .body(lotsOfQuotes.take(size), Quote.class); }
-
Integration Tests with WebTestClient
Test Class
@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = RANDOM_PORT) public class DemostockQuotesApplicationTests { @Autowired private WebTestClient webTestClient; ... }NoteRANDOM_PORTstarts server locally on a random port
@Autowiredinjects a fully configuredWebTestClientFixed Size Response Test
@Test public void testFetchQuotes() { webTestClient .get().uri("/quotes?size=20") .accept(APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectHeader().contentType(APPLICATION_JSON) .expectBodyList(Quote.class).hasSize(20) .consumeWith(allQuotes -> assertThat(allQuotes.getResponseBody()) .allSatisfy(quote -> assertThat(quote.getPrice()) .isPositive())); }Streaming Endpoint Test
@Test public void testFetchQuotesAsStreamWithStepVerifier() { StepVerifier .create(webTestClient.get().uri("/quotes") .accept(APPLICATION_STREAM_JSON) .exchange() .expectStatus().isOk() .expectHeader().contentType(APPLICATION_STREAM_JSON) .returnResult(Quote.class) .getResponseBody()) .thenRequest(30) .thenConsumeWhile(quote -> quote.getPrice().signum() > 0) .expectNextCount(30) .thenCancel(); }NoteUsed StepVerifierto inspect infinite stream’s first couple of elements,
instead of original test, which collects to a list and blocks.
-
Note
New Spring Boot App with Reactive Web, Devtools, Thymeleaf, and Reactive Mongo TradingUser Entity
@Document public class TradingUser { @Id private String id; private String userName; private String fullName; //... }Reactive Mongo Repository
public interface TradingUserRepository extends ReactiveMongoRepository<TradingUser, String> { Mono<TradingUser> findUserByUserName(String userName); }CommandLineRunner to Fill Mongo Repo
@Component public class UsersCommandLineRunner implements CommandLineRunner { private final TradingUserRepository repo; public UsersCommandLineRunner(TradingUserRepository repo) { this.repo = repo; } @Override public void run(String... args) throws Exception { List<TradingUser> users = List.of( //... ); repo.insert(users).blockLast(Duration.ofSeconds(3)); } }Restcontroller for Users
@GetMapping(value = "/users", produces = MediaType.APPLICATION_JSON_VALUE) public Flux<TradingUser> getAllUsers() { return repo.findAll(); } @GetMapping(value = "/users/{username}", produces = MediaType.APPLICATION_JSON_VALUE) public Mono<TradingUser> getUserByName(@PathVariable("username") String userName) { return repo.findUserByUserName(userName); }
-
Controller Unit Tests
@RunWith(SpringRunner.class) @WebFluxTest public class UserControllerTest { @MockBean private TradingUserRepository mockRepository; @Autowired private WebTestClient webClient; //... }Get All Users
@Test public void testGetAllUsers() { var users = List.of( new TradingUser("wabbit_one", "Sebastien Wabbit"), new TradingUser("not_a_wabbit", "Not a Wabbit") ); given(mockRepository.findAll()) .willReturn(Flux.fromIterable(users)); webClient.get() .uri("/users") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBodyList(TradingUser.class) .isEqualTo(users); }Find User
@Test public void getUserByName() { var user = new TradingUser("wabbit_one", "Sebastien Wabbit"); given(mockRepository.findUserByUserName("wabbit_one")) .willReturn(Mono.just(user)); webClient.get() .uri("/users/wabbit_one") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBody(TradingUser.class) .isEqualTo(user); }