Skip to content

Commit 0ecf489

Browse files
committed
Address review: include headers in toString, cache typed reference in KeyValueStoreWrapper, normalize null headers in ChangeLogging
1 parent ad87718 commit 0ecf489

5 files changed

Lines changed: 15 additions & 7 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStoreWithHeaders.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ public interface VersionedKeyValueStoreWithHeaders<K, V> extends VersionedKeyVal
3636
* @param key The key
3737
* @param value The value, it can be {@code null}. {@code null} is interpreted as a delete.
3838
* @param timestamp The timestamp for this record version
39-
* @param headers The record headers
39+
* @param headers The record headers; must not be {@code null}
4040
* @return The validTo timestamp of the newly put record, or special values as defined
4141
* in {@link VersionedKeyValueStore#put(Object, Object, long)}.
42+
* @throws NullPointerException if {@code headers} is {@code null}
4243
*/
4344
long put(K key, V value, long timestamp, Headers headers);
4445
}

streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public Headers headers() {
106106

107107
@Override
108108
public String toString() {
109-
return "<" + value + "," + timestamp + "," + validTo + ">";
109+
return "<" + value + "," + timestamp + "," + validTo + "," + headers + ">";
110110
}
111111

112112
@Override

streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ public long put(final Bytes key, final byte[] value, final long timestamp) {
4747

4848
@Override
4949
public long put(final Bytes key, final byte[] value, final long timestamp, final Headers headers) {
50-
final long validTo = inner.put(key, value, timestamp, headers);
51-
log(key, value, timestamp, headers != null ? headers : new RecordHeaders());
50+
final Headers nonNullHeaders = headers != null ? headers : new RecordHeaders();
51+
final long validTo = inner.put(key, value, timestamp, nonNullHeaders);
52+
log(key, value, timestamp, nonNullHeaders);
5253
return validTo;
5354
}
5455

streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class KeyValueStoreWrapper<K, V> implements StateStore {
5151

5252
private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
5353
private VersionedKeyValueStore<K, V> versionedStore = null;
54+
private VersionedKeyValueStoreWithHeaders<K, V> versionedStoreWithHeaders = null;
5455

5556
// same as either timestampedStore or versionedStore above. kept merely as a convenience
5657
// to simplify implementation for methods which do not depend on store type.
@@ -79,6 +80,9 @@ public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final String s
7980
try {
8081
versionedStore = context.getStateStore(storeName);
8182
store = versionedStore;
83+
if (versionedStore instanceof VersionedKeyValueStoreWithHeaders) {
84+
versionedStoreWithHeaders = (VersionedKeyValueStoreWithHeaders<K, V>) versionedStore;
85+
}
8286
} catch (final ClassCastException e) {
8387
store = context.getStateStore(storeName);
8488
final String storeType = store == null ? "null" : store.getClass().getName();
@@ -129,8 +133,8 @@ public long put(final K key, final V value, final long timestamp, final Headers
129133
return PUT_RETURN_CODE_IS_LATEST;
130134
}
131135
if (versionedStore != null) {
132-
if (versionedStore instanceof VersionedKeyValueStoreWithHeaders) {
133-
return ((VersionedKeyValueStoreWithHeaders<K, V>) versionedStore).put(key, value, timestamp, headers);
136+
if (versionedStoreWithHeaders != null) {
137+
return versionedStoreWithHeaders.put(key, value, timestamp, headers);
134138
}
135139
return versionedStore.put(key, value, timestamp);
136140
}

streams/src/test/java/org/apache/kafka/streams/state/VersionedRecordWithHeadersTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ public void shouldPreserveEqualityForRecordsWithDifferentHeaders() {
118118
@Test
119119
public void shouldReturnCorrectToString() {
120120
final VersionedRecord<String> record = new VersionedRecord<>("value", 42L);
121-
assertEquals("<value,42,Optional.empty>", record.toString());
121+
final String str = record.toString();
122+
assertTrue(str.startsWith("<value,42,Optional.empty,"));
123+
assertTrue(str.contains("RecordHeaders"));
122124
}
123125
}

0 commit comments

Comments
 (0)