Skip to content

Commit c3cb70f

Browse files
committed
#469 - Suppress emission of null values when using simple and primitive result types.
Results projected onto simple and primitive types that are null are no longer emitted. A SQL query SELECT MAX(age) FROM my_table that returns a SQL NULL and that would be consumed as Long.class (Publisher<Long>) is an example for a primitive type that can be null. Since Reactive Streams prohibits the propagation of null values by a Publisher to a Subscriber the only viable option is to suppress null results by wrapping the mapping function into Optional result values and filter these values later to avoid null being emitted.
1 parent 7a094b6 commit c3cb70f

File tree

6 files changed

+122
-3
lines changed

6 files changed

+122
-3
lines changed

src/main/java/org/springframework/data/r2dbc/convert/MappingR2dbcConverter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,15 @@ public Class<?> getTargetType(Class<?> valueType) {
579579
});
580580
}
581581

582+
/*
583+
* (non-Javadoc)
584+
* @see org.springframework.data.r2dbc.convert.R2dbcConverter#isSimpleType(Class)
585+
*/
586+
@Override
587+
public boolean isSimpleType(Class<?> type) {
588+
return getConversions().isSimpleType(type);
589+
}
590+
582591
// ----------------------------------
583592
// Id handling
584593
// ----------------------------------

src/main/java/org/springframework/data/r2dbc/convert/R2dbcConverter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,17 @@ public interface R2dbcConverter
7272
*/
7373
Class<?> getTargetType(Class<?> valueType);
7474

75+
/**
76+
* Return whether the {@code type} is a simple type. Simple types are database primitives or types with a custom
77+
* mapping strategy.
78+
*
79+
* @param valueType the type to inspect, must not be {@literal null}.
80+
* @return {@literal true} if the type is a simple one.
81+
* @see org.springframework.data.mapping.model.SimpleTypeHolder
82+
* @since 1.2
83+
*/
84+
boolean isSimpleType(Class<?> type);
85+
7586
/**
7687
* Returns a {@link java.util.function.Function} that populates the id property of the {@code object} from a
7788
* {@link Row}.

src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,14 +432,23 @@ private <T> RowsFetchSpec<T> doSelect(Query query, Class<?> entityClass, SqlIden
432432

433433
PreparedOperation<?> operation = statementMapper.getMappedObject(selectSpec);
434434

435+
boolean simpleType;
435436
BiFunction<Row, RowMetadata, T> rowMapper;
436437
if (returnType.isInterface()) {
438+
simpleType = getConverter().isSimpleType(entityClass);
437439
rowMapper = dataAccessStrategy.getRowMapper(entityClass)
438440
.andThen(o -> projectionFactory.createProjection(returnType, o));
439441
} else {
442+
simpleType = getConverter().isSimpleType(returnType);
440443
rowMapper = dataAccessStrategy.getRowMapper(returnType);
441444
}
442445

446+
// avoid top-level null values if the read type is a simple one (e.g. SELECT MAX(age) via Integer.class)
447+
if (simpleType) {
448+
return new UnwrapOptionalFetchSpecAdapter<>(this.databaseClient.sql(operation)
449+
.map((row, metadata) -> Optional.ofNullable(rowMapper.apply(row, metadata))));
450+
}
451+
443452
return this.databaseClient.sql(operation).map(rowMapper);
444453
}
445454

@@ -940,4 +949,28 @@ public Mono<Integer> rowsUpdated() {
940949
}
941950
}
942951
}
952+
953+
private static class UnwrapOptionalFetchSpecAdapter<T> implements RowsFetchSpec<T> {
954+
955+
private final RowsFetchSpec<Optional<T>> delegate;
956+
957+
private UnwrapOptionalFetchSpecAdapter(RowsFetchSpec<Optional<T>> delegate) {
958+
this.delegate = delegate;
959+
}
960+
961+
@Override
962+
public Mono<T> one() {
963+
return delegate.one().handle((optional, sink) -> optional.ifPresent(sink::next));
964+
}
965+
966+
@Override
967+
public Mono<T> first() {
968+
return delegate.first().handle((optional, sink) -> optional.ifPresent(sink::next));
969+
}
970+
971+
@Override
972+
public Flux<T> all() {
973+
return delegate.all().handle((optional, sink) -> optional.ifPresent(sink::next));
974+
}
975+
}
943976
}

src/main/java/org/springframework/data/r2dbc/repository/query/AbstractR2dbcQuery.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
2020

21+
import java.util.Optional;
22+
2123
import org.reactivestreams.Publisher;
2224

2325
import org.springframework.data.mapping.model.EntityInstantiators;
@@ -100,8 +102,17 @@ private Publisher<?> executeQuery(RelationalParameterAccessor parameterAccessor,
100102
if (isExistsQuery()) {
101103
fetchSpec = (FetchSpec) boundQuery.map(row -> true);
102104
} else if (requiresMapping()) {
103-
EntityRowMapper rowMapper = new EntityRowMapper<>(resolveResultType(processor), converter);
104-
fetchSpec = new FetchSpecAdapter<>(boundQuery.map(rowMapper));
105+
106+
Class<?> resultType = resolveResultType(processor);
107+
EntityRowMapper rowMapper = new EntityRowMapper<>(resultType, converter);
108+
109+
if (converter.isSimpleType(resultType)) {
110+
fetchSpec = new UnwrapOptionalFetchSpecAdapter<>(
111+
boundQuery.map((row, rowMetadata) -> Optional.ofNullable(rowMapper.apply(row, rowMetadata))));
112+
113+
} else {
114+
fetchSpec = new FetchSpecAdapter<>(boundQuery.map(rowMapper));
115+
}
105116
} else {
106117
fetchSpec = (FetchSpec) boundQuery.fetch();
107118
}
@@ -222,4 +233,33 @@ public Mono<Integer> rowsUpdated() {
222233
throw new UnsupportedOperationException("Not supported after applying a row mapper");
223234
}
224235
}
236+
237+
private static class UnwrapOptionalFetchSpecAdapter<T> implements FetchSpec<T> {
238+
239+
private final RowsFetchSpec<Optional<T>> delegate;
240+
241+
private UnwrapOptionalFetchSpecAdapter(RowsFetchSpec<Optional<T>> delegate) {
242+
this.delegate = delegate;
243+
}
244+
245+
@Override
246+
public Mono<T> one() {
247+
return delegate.one().handle((optional, sink) -> optional.ifPresent(sink::next));
248+
}
249+
250+
@Override
251+
public Mono<T> first() {
252+
return delegate.first().handle((optional, sink) -> optional.ifPresent(sink::next));
253+
}
254+
255+
@Override
256+
public Flux<T> all() {
257+
return delegate.all().handle((optional, sink) -> optional.ifPresent(sink::next));
258+
}
259+
260+
@Override
261+
public Mono<Integer> rowsUpdated() {
262+
throw new UnsupportedOperationException("Not supported after applying a row mapper");
263+
}
264+
}
225265
}

src/test/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplateUnitTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,25 @@ public void shouldCountBy() {
104104
assertThat(statement.getBindings()).hasSize(1).containsEntry(0, Parameter.from("Walter"));
105105
}
106106

107-
@Test // gh-220
107+
@Test // gh-469
108+
public void shouldProjectExistsResult() {
109+
110+
MockRowMetadata metadata = MockRowMetadata.builder()
111+
.columnMetadata(MockColumnMetadata.builder().name("name").build()).build();
112+
MockResult result = MockResult.builder().rowMetadata(metadata)
113+
.row(MockRow.builder().identified(0, Object.class, null).build()).build();
114+
115+
recorder.addStubbing(s -> s.startsWith("SELECT"), result);
116+
117+
entityTemplate.select(Person.class) //
118+
.as(Integer.class) //
119+
.matching(Query.empty().columns("MAX(age)")) //
120+
.all() //
121+
.as(StepVerifier::create) //
122+
.verifyComplete();
123+
}
124+
125+
@Test // gh-469
108126
public void shouldExistsByCriteria() {
109127

110128
MockRowMetadata metadata = MockRowMetadata.builder()

src/test/java/org/springframework/data/r2dbc/repository/H2R2dbcRepositoryIntegrationTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ protected Class<? extends LegoSetRepository> getRepositoryInterfaceType() {
9292
return H2LegoSetRepository.class;
9393
}
9494

95+
@Test // gh-469
96+
public void shouldSuppressNullValues() {
97+
repository.findMax("doo").as(StepVerifier::create).verifyComplete();
98+
}
99+
95100
@Test // gh-235
96101
public void shouldReturnUpdateCount() {
97102

@@ -139,6 +144,9 @@ public void shouldInsertIdOnlyEntity() {
139144

140145
interface H2LegoSetRepository extends LegoSetRepository {
141146

147+
@Query("SELECT MAX(manual) FROM legoset WHERE name = :name")
148+
Mono<Integer> findMax(String name);
149+
142150
@Override
143151
@Query("SELECT name FROM legoset")
144152
Flux<Named> findAsProjection();

0 commit comments

Comments
 (0)