diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java index 48d9c597ee4d2..4b8f25d9e4ced 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -35,15 +36,19 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.AggregationWithHeaders; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.SessionStoreWithHeaders; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders; @@ -556,4 +561,245 @@ public void process(final Record record) { assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); } } + + @Test + public void processorShouldAccessKStreamAggregatedKTableStoreAsHeadersStoreViaSupplier() { + final StreamsBuilder builder = new StreamsBuilder(); + + final Materialized> materialized = + Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store")); + + builder + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .aggregate( + () -> "", + (key, value, aggregate) -> value, + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ) + .toStream() + .process(() -> new ContextualProcessor() { + @Override + public void process(final Record record) { + final TimestampedKeyValueStoreWithHeaders store = context().getStateStore("table-store"); + + try (final KeyValueIterator> it = store.all()) { + while (it.hasNext()) { + final KeyValue> row = it.next(); + context().forward(new Record<>(row.key, row.value.value(), row.value.timestamp())); + } + } + } + }, "table-store") + .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); + + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { + final TestInputTopic inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); + final TestOutputTopic outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new StringDeserializer()); + + inputTopic.pipeInput("key1", "value1"); + + assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); + } + } + + @Test + public void processorShouldAccessKStreamReducedKTableStoreAsHeadersStoreViaSupplier() { + final StreamsBuilder builder = new StreamsBuilder(); + + final Materialized> materialized = + Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store")); + + builder + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .reduce( + (value, aggregate) -> value, + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ) + .toStream() + .process(() -> new ContextualProcessor() { + @Override + public void process(final Record record) { + final TimestampedKeyValueStoreWithHeaders store = context().getStateStore("table-store"); + + try (final KeyValueIterator> it = store.all()) { + while (it.hasNext()) { + final KeyValue> row = it.next(); + context().forward(new Record<>(row.key, row.value.value(), row.value.timestamp())); + } + } + } + }, "table-store") + .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); + + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { + final TestInputTopic inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); + final TestOutputTopic outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new StringDeserializer()); + + inputTopic.pipeInput("key1", "value1"); + + assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); + } + } + + @Test + public void processorShouldAccessKStreamCountKTableStoreAsHeadersStoreViaSupplier() { + final StreamsBuilder builder = new StreamsBuilder(); + + builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .count(Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store"))) + .toStream() + .process(() -> new ContextualProcessor() { + @Override + public void process(final Record record) { + final TimestampedKeyValueStoreWithHeaders store = context().getStateStore("table-store"); + + try (final KeyValueIterator> it = store.all()) { + while (it.hasNext()) { + final KeyValue> row = it.next(); + context().forward(new Record<>(row.key, row.value.value(), row.value.timestamp())); + } + } + } + }, "table-store") + .to("output-topic", Produced.with(Serdes.String(), Serdes.Long())); + + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { + final TestInputTopic inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); + final TestOutputTopic outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer()); + + inputTopic.pipeInput("key1", "value1"); + + assertEquals(KeyValue.pair("key1", 1L), outputTopic.readKeyValue()); + } + } + + @Test + public void processorShouldBuildTopologyWithWindowStoreWithHeadersViaSupplier() { + final StreamsBuilder builder = new StreamsBuilder(); + + final Materialized> materialized = + Materialized.as(Stores.persistentTimestampedWindowStoreWithHeaders("table-store", Duration.ofHours(24L), Duration.ofHours(1L), false)); + + builder + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1L))) + .aggregate( + () -> "", + (key, value, aggregate) -> value, + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ) + .toStream() + .process(() -> new ContextualProcessor, String, Windowed, String>() { + @Override + public void process(final Record, String> record) { + final WindowStore> store = context().getStateStore("table-store"); + + try (final KeyValueIterator, ValueTimestampHeaders> it = store.all()) { + while (it.hasNext()) { + final KeyValue, ValueTimestampHeaders> row = it.next(); + context().forward(new Record<>(row.key, row.value.value(), row.value.timestamp())); + } + } + } + }, "table-store") + .to("output-topic", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class, Duration.ofHours(1L).toMillis()), Serdes.String())); + + // Verify topology can be built and run with window headers store supplier + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { + final TestInputTopic inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); + final TestOutputTopic, String> outputTopic = testDriver.createOutputTopic("output-topic", new TimeWindowedDeserializer<>(new StringDeserializer(), Duration.ofHours(1L).toMillis()), new StringDeserializer()); + + inputTopic.pipeInput("key1", "value1"); + + assertEquals("value1", outputTopic.readKeyValue().value); + } + } + + @Test + public void processorShouldAccessKStreamSessionAggregatedKTableStoreAsHeadersStoreViaSupplier() { + final StreamsBuilder builder = new StreamsBuilder(); + + final Materialized> materialized = + Materialized.as(Stores.persistentSessionStoreWithHeaders("table-store", Duration.ofHours(1L))); + + builder + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1L))) + .aggregate( + () -> "", + (key, value, aggregate) -> value, + (key, left, right) -> left, + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ) + .toStream((windowedKey, value) -> windowedKey.key()) + .process(() -> new ContextualProcessor() { + @Override + public void process(final Record record) { + final SessionStoreWithHeaders store = context().getStateStore("table-store"); + + try (final KeyValueIterator, AggregationWithHeaders> it = store.findSessions("key1", 0L, Long.MAX_VALUE)) { + while (it.hasNext()) { + final KeyValue, AggregationWithHeaders> row = it.next(); + context().forward(new Record<>(row.key.key(), row.value.aggregation(), record.timestamp())); + } + } + } + }, "table-store") + .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); + + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { + final TestInputTopic inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); + final TestOutputTopic outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new StringDeserializer()); + + inputTopic.pipeInput("key1", "value1"); + + assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); + } + } + + @Test + public void processorShouldAccessKStreamSessionReducedKTableStoreAsHeadersStoreViaSupplier() { + final StreamsBuilder builder = new StreamsBuilder(); + + final Materialized> materialized = + Materialized.as(Stores.persistentSessionStoreWithHeaders("table-store", Duration.ofHours(1L))); + + builder + .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1L))) + .reduce( + (value, aggregate) -> value, + materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ) + .toStream((windowedKey, value) -> windowedKey.key()) + .process(() -> new ContextualProcessor() { + @Override + public void process(final Record record) { + final SessionStoreWithHeaders store = context().getStateStore("table-store"); + + try (final KeyValueIterator, AggregationWithHeaders> it = store.findSessions("key1", 0L, Long.MAX_VALUE)) { + while (it.hasNext()) { + final KeyValue, AggregationWithHeaders> row = it.next(); + context().forward(new Record<>(row.key.key(), row.value.aggregation(), record.timestamp())); + } + } + } + }, "table-store") + .to("output-topic", Produced.with(Serdes.String(), Serdes.String())); + + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { + final TestInputTopic inputTopic = testDriver.createInputTopic("input-topic", new StringSerializer(), new StringSerializer()); + final TestOutputTopic outputTopic = testDriver.createOutputTopic("output-topic", new StringDeserializer(), new StringDeserializer()); + + inputTopic.pipeInput("key1", "value1"); + + assertEquals(KeyValue.pair("key1", "value1"), outputTopic.readKeyValue()); + } + } }