diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt index 8b92dcd6..1aa8d830 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt @@ -57,6 +57,9 @@ object Routes { object DeadLetter { const val LETTERS = "dlq-query-dead-letters" const val SEQUENCE_SIZE = "dlq-query-dead-letter-sequence-size" + // Paginated lookup of letters within a single sequence — added in AF5 framework-client + // 5.1.0 so the platform UI can browse very long sequences without loading them all. + const val SEQUENCE_LETTERS = "dlq-query-dead-letter-sequence-letters" const val DELETE_SEQUENCE = "dlq-command-delete-sequence" const val DELETE_ALL_SEQUENCES = "dlq-command-delete-all-sequences" diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt index 92adc2a7..d17e2198 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt @@ -104,8 +104,10 @@ data class SupportedFeatures( val pauseReports: Boolean? = false, /* Whether the client supports thread dumps.*/ val threadDump: Boolean? = false, - /* Whether the client supports DLQ insights. Can be FULL, LIMITED, MASKED, or NONE (default).*/ - val deadLetterQueuesInsights: AxoniqConsoleDlqMode = AxoniqConsoleDlqMode.NONE, + /* DLQ insight level for this client. `null` means the application has no DLQ library on + * its classpath, so DLQ inspection isn't a feature of this client at all (distinct from + * `NONE`, which means the feature exists but the operator hid all letter data). */ + val deadLetterQueuesInsights: AxoniqConsoleDlqMode? = null, /* Whether the client supports domain events insights. Can be FULL, LOAD_DOMAIN_STATE_ONLY, PREVIEW_PAYLOAD_ONLY, or NONE (default).*/ val domainEventsInsights: DomainEventAccessMode = DomainEventAccessMode.NONE, /* Whether the client supports client status updates .*/ diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/deadLetterApi.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/deadLetterApi.kt index 49f1bbd6..b9f7786d 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/deadLetterApi.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/deadLetterApi.kt @@ -71,3 +71,28 @@ data class DeadLetterProcessRequest( val processingGroup: String, val messageIdentifier: String ) + +/** + * Request paginated letters belonging to a single sequence inside the DLQ. Used by the platform UI + * detail modal to browse long sequences without loading them all up-front. + * + * @param processingGroup The processing group / DLQ identifier. + * @param sequenceIdentifier Synthetic sequence id as previously returned by [DeadLetter.sequenceIdentifier]. + * @param offset Zero-based offset into the sequence. + * @param size Number of letters to return (capped server-side). + */ +data class FetchSequenceLettersRequest( + val processingGroup: String, + val sequenceIdentifier: String, + val offset: Int, + val size: Int, +) + +/** + * Response payload for [FetchSequenceLettersRequest]. Carries the requested slice of letters along + * with the total number of letters in the sequence so the UI can render full pagination. + */ +data class SequenceLettersResponse( + val letters: List, + val totalCount: Long = letters.size.toLong(), +) diff --git a/framework-client/pom.xml b/framework-client/pom.xml index fe44c45e..52880799 100644 --- a/framework-client/pom.xml +++ b/framework-client/pom.xml @@ -83,6 +83,12 @@ provided true + + io.axoniq.framework + axoniq-dead-letter + provided + true + tools.jackson.core diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfiguration.java b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfiguration.java index c0c2feeb..21299d98 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfiguration.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfiguration.java @@ -17,9 +17,13 @@ package io.axoniq.platform.framework; import io.axoniq.platform.framework.api.DomainEventAccessMode; +import io.axoniq.platform.framework.api.AxoniqConsoleDlqMode; import org.axonframework.common.BuilderUtils; import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -44,6 +48,9 @@ public class AxoniqPlatformConfiguration { private DomainEventAccessMode domainEventAccessMode = DomainEventAccessMode.NONE; + private AxoniqConsoleDlqMode dlqMode = AxoniqConsoleDlqMode.NONE; + private List dlqDiagnosticsWhitelist = new ArrayList<>(); + /** * Constructor to instantiate a {@link AxoniqPlatformConfiguration} based on the fields contained in the * {@link AxoniqPlatformConfiguration}. Requires the {@code environmentId}, {@code accessToken} and @@ -208,4 +215,44 @@ public Long getInitialDelay() { public DomainEventAccessMode getDomainEventAccessMode() { return domainEventAccessMode; } + + /** + * Controls how much DLQ data is exposed through the platform API. Defaults to + * {@link AxoniqConsoleDlqMode#NONE} so applications must deliberately opt into exposing letter + * contents (which may include personal data and would make the platform a data processor under + * GDPR). Use {@code MASKED} when sequence identifiers must still be addressable but contents + * must not leak, {@code LIMITED} to strip payload but keep sequence identifiers as-is for + * filtered diagnostics, or {@code FULL} for unrestricted access (typically only safe in + * development). + * + * @param dlqMode The dead-letter exposure mode. + * @return The builder for fluent interfacing. + */ + public AxoniqPlatformConfiguration dlqMode(AxoniqConsoleDlqMode dlqMode) { + BuilderUtils.assertNonNull(dlqMode, "Axoniq Platform dlqMode may not be null"); + this.dlqMode = dlqMode; + return this; + } + + /** + * Adds a diagnostics metadata key to the whitelist that survives {@link AxoniqConsoleDlqMode#LIMITED} + * and {@link AxoniqConsoleDlqMode#MASKED} modes. All other keys are dropped before the letter + * leaves this client. + * + * @param key The diagnostics metadata key to permit. + * @return The builder for fluent interfacing. + */ + public AxoniqPlatformConfiguration addDlqDiagnosticsWhitelistKey(String key) { + BuilderUtils.assertNonEmpty(key, "Axoniq Platform diagnostics whitelist key may not be null or empty"); + this.dlqDiagnosticsWhitelist.add(key); + return this; + } + + public AxoniqConsoleDlqMode getDlqMode() { + return dlqMode; + } + + public List getDlqDiagnosticsWhitelist() { + return Collections.unmodifiableList(dlqDiagnosticsWhitelist); + } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt b/framework-client/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt index bbd73461..70819c79 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt @@ -18,6 +18,7 @@ package io.axoniq.platform.framework.client import io.axoniq.platform.framework.AxoniqPlatformConfiguration import io.axoniq.platform.framework.api.AxonServerEventStoreMessageSourceInformation +import io.axoniq.platform.framework.api.AxoniqConsoleDlqMode import io.axoniq.platform.framework.api.CommandBusInformation import io.axoniq.platform.framework.api.DomainEventAccessMode import io.axoniq.platform.framework.api.CommonProcessorInformation @@ -87,6 +88,7 @@ class SetupPayloadCreator( licenseEntitlement = hasEntitlementManager(), modelInspection = hasStateManager(), domainEventsInsights = resolveDomainEventAccessMode(), + deadLetterQueuesInsights = resolveDeadLetterQueuesInsights(), ) ) } @@ -363,6 +365,38 @@ class SetupPayloadCreator( } } + /** + * Resolves the [AxoniqPlatformConfiguration] from the application configuration, returning `null` + * when the platform module hasn't been wired (legacy or non-Spring setups). The caller falls back + * to `NONE` so applications that haven't deliberately opted in stay closed by default — exposing + * letter contents (which can include personal data) requires an explicit `dlqMode` override + * (`LIMITED`/`MASKED`/`FULL`) on the application's [AxoniqPlatformConfiguration]. + */ + private fun axoniqPlatformConfiguration(): AxoniqPlatformConfiguration? = + configuration.getOptionalComponent(AxoniqPlatformConfiguration::class.java).orElse(null) + + /** + * Returns the DLQ insight level reported on the setup payload, or `null` when this application + * has no DLQ library on its classpath (in which case DLQ inspection isn't a feature of this + * client at all — semantically distinct from [AxoniqConsoleDlqMode.NONE], which means the feature + * exists but the operator hid all letter data). + */ + private fun resolveDeadLetterQueuesInsights(): AxoniqConsoleDlqMode? { + if (!isDeadLetterLibraryAvailable()) return null + return axoniqPlatformConfiguration()?.dlqMode ?: AxoniqConsoleDlqMode.NONE + } + + private fun isDeadLetterLibraryAvailable(): Boolean = try { + Class.forName( + "io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue", + false, + SetupPayloadCreator::class.java.classLoader, + ) + true + } catch (_: ClassNotFoundException) { + false + } + /** * Checks whether the PlatformLicenseSource have been configured, in which case we want updates of licenses from Platform. */ diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancer.java new file mode 100644 index 00000000..a66b737e --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancer.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor; + +import io.axoniq.platform.framework.AxoniqPlatformConfiguration; +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar; +import org.axonframework.common.configuration.ComponentDefinition; +import org.axonframework.common.configuration.ComponentRegistry; +import org.axonframework.common.configuration.ConfigurationEnhancer; +import org.axonframework.common.lifecycle.Phase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER; + +/** + * Service-loaded enhancer that registers the dead-letter queue inspection components only when the + * {@code axoniq-dead-letter} module is present on the classpath. Kept free of direct references to + * {@link DeadLetterManager} or {@link RSocketDlqResponder} (which import optional types) so the class can be + * loaded even when the addon is absent. + */ +public class AxoniqPlatformDeadLetterConfigurerEnhancer implements ConfigurationEnhancer { + + private static final Logger LOGGER = + LoggerFactory.getLogger(AxoniqPlatformDeadLetterConfigurerEnhancer.class); + private static final String DEAD_LETTER_PROBE_CLASS = + "io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue"; + + @Override + public void enhance(ComponentRegistry registry) { + if (!registry.hasComponent(AxoniqPlatformConfiguration.class)) { + return; + } + // Enhancers can be invoked more than once during context refresh — bail out if the DLQ + // components are already registered to avoid ComponentOverrideException. + if (registry.hasComponent(DeadLetterManager.class) + || registry.hasComponent(ProcessingGroupInfoSource.class)) { + return; + } + if (!isClasspathAvailable()) { + LOGGER.debug("axoniq-dead-letter not on classpath; skipping dead-letter queue inspection wiring."); + return; + } + register(registry); + } + + @Override + public int order() { + // Run after the main platform enhancer so the RSocketHandlerRegistrar component is already declared. + return PLATFORM_ENHANCER_ORDER + 1; + } + + private static void register(ComponentRegistry registry) { + registry.registerComponent(ComponentDefinition + .ofType(DeadLetterManager.class) + .withBuilder(c -> { + AxoniqPlatformConfiguration platformConfig = + c.getComponent(AxoniqPlatformConfiguration.class); + return new DeadLetterManager( + c, + platformConfig.getDlqMode(), + platformConfig.getDlqDiagnosticsWhitelist()); + }) + // Discover DLQs after event processors have started, by which point the + // EventHandlingComponent decorator chain has materialised every DLQ. + .onStart(Phase.INSTRUCTION_COMPONENTS, DeadLetterManager::start)); + + // The Spring-backed ComponentRegistry exposes a registered component under all of its + // implemented interfaces automatically, so registering DeadLetterManager already makes + // ProcessingGroupInfoSource available. The plain AF5 ComponentRegistry is exact-typed + // though, so only register the seam there to keep ProcessorReportCreator's lookup + // (`getOptionalComponent(ProcessingGroupInfoSource.class)`) working in both worlds. + if (!registry.hasComponent(ProcessingGroupInfoSource.class)) { + registry.registerComponent(ComponentDefinition + .ofType(ProcessingGroupInfoSource.class) + .withBuilder(c -> c.getComponent(DeadLetterManager.class))); + } + + registry.registerComponent(ComponentDefinition + .ofType(RSocketDlqResponder.class) + .withBuilder(c -> new RSocketDlqResponder( + c.getComponent(DeadLetterManager.class), + c.getComponent(RSocketHandlerRegistrar.class))) + .onStart(Phase.EXTERNAL_CONNECTIONS, RSocketDlqResponder::start)); + } + + private static boolean isClasspathAvailable() { + try { + Class.forName(DEAD_LETTER_PROBE_CLASS, false, + AxoniqPlatformDeadLetterConfigurerEnhancer.class.getClassLoader()); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformEventHandlingComponent.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformEventHandlingComponent.kt index 0ac37ac7..5e3badc8 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformEventHandlingComponent.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformEventHandlingComponent.kt @@ -29,17 +29,14 @@ import org.axonframework.messaging.core.Message import org.axonframework.messaging.core.MessageStream import org.axonframework.messaging.core.QualifiedName import org.axonframework.messaging.core.unitofwork.ProcessingContext -import org.axonframework.messaging.eventhandling.EventHandler -import org.axonframework.messaging.eventhandling.EventHandlerRegistry import org.axonframework.messaging.eventhandling.EventHandlingComponent import org.axonframework.messaging.eventhandling.EventMessage import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.Segment -import org.axonframework.messaging.eventhandling.replay.ResetContext import java.time.Instant import java.time.temporal.ChronoUnit class AxoniqPlatformEventHandlingComponent( - private val delegate: EventHandlingComponent, + val delegate: EventHandlingComponent, private val processorName: String, private val handlerMetricsRegistry: HandlerMetricsRegistry, private val processorMetricRegistry: ProcessorMetricsRegistry diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt new file mode 100644 index 00000000..3ead64ff --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt @@ -0,0 +1,511 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.framework.messaging.deadletter.DeadLetter +import io.axoniq.framework.messaging.deadletter.SequencedDeadLetterProcessor +import io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue +import io.axoniq.platform.framework.api.AxoniqConsoleDlqMode +import io.axoniq.platform.framework.api.DeadLetterResponse +import io.axoniq.platform.framework.api.SequenceLettersResponse +import org.apache.commons.codec.digest.DigestUtils +import org.axonframework.common.configuration.Configuration +import org.axonframework.messaging.core.EmptyApplicationContext +import org.axonframework.messaging.core.Metadata +import org.axonframework.messaging.core.unitofwork.ProcessingContext +import org.axonframework.messaging.core.unitofwork.SimpleUnitOfWorkFactory +import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory +import org.axonframework.messaging.eventhandling.EventHandlingComponent +import org.axonframework.messaging.eventhandling.EventMessage +import org.slf4j.LoggerFactory +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit +import io.axoniq.platform.framework.api.DeadLetter as ApiDeadLetter + +private const val LETTER_PAYLOAD_SIZE_LIMIT = 1024 +private const val MASKED = "" +private const val LIMITED = "" +private val logger = LoggerFactory.getLogger(DeadLetterManager::class.java) + +/** + * Inspects and operates on the dead-letter queues belonging to event handling components configured on this + * application. + * + * In AF5 each event handling component within a Pooled Streaming processor may have its own dead-letter queue. + * Queues are registered in the [Configuration] under names of the form + * `DeadLetterQueue[EventHandlingComponent[][]]`. + * + * To stay compatible with the platform's AF4-based DLQ API (which expects a single "processing group" identifier per + * DLQ) this manager exposes each DLQ under a synthesised identifier: + * - if a processor has a single DLQ the identifier equals the processor name (matches the issue requirement); + * - if a processor has multiple DLQs each is exposed as `::` so they remain + * addressable individually. + * + * Sequence identifiers exposed through the API come from the [EventHandlingComponent]'s configured sequencing + * policy (via [EventHandlingComponent.sequenceIdentifierFor]), matching AF4 semantics. This makes sequence ids + * stable across letter eviction — deleting the first letter no longer renames the sequence as it did under the + * earlier "first letter's message id" synthetic scheme. + */ +class DeadLetterManager @JvmOverloads constructor( + private val configuration: Configuration, + private val dlqMode: AxoniqConsoleDlqMode = AxoniqConsoleDlqMode.NONE, + private val dlqDiagnosticsWhitelist: List = emptyList(), +) : ProcessingGroupInfoSource { + + @Volatile + private var entries: List? = null + + /** + * Factory used to materialise a real [org.axonframework.messaging.core.unitofwork.UnitOfWork] when the + * manager needs to call [EventHandlingComponent.sequenceIdentifierFor] on a dead letter — that call + * requires a non-null [ProcessingContext] because some decorator layers (notably + * `SequenceCachingEventHandlingComponent`) store per-event resources on the context. + * + * An [EmptyApplicationContext] is used because the stock sequencing-policy chain (constant, + * property, metadata, hierarchical, fallback) does not look up application components; the context + * is only consulted as a resource bag. If a custom policy ever needs richer context resolution the + * wiring can be revisited. + */ + private val unitOfWorkFactory: UnitOfWorkFactory = + SimpleUnitOfWorkFactory(EmptyApplicationContext.INSTANCE) + + /** + * Discovers the DLQs configured on this application by walking each event-processor module. + * Called once via the lifecycle; subsequent invocations refresh the cached view. + * + * Logs the active [dlqMode] at INFO so operators can confirm from application logs that the + * configured exposure level (`axoniq.platform.dlq-mode`) has actually taken effect. We stay at + * INFO regardless of mode because some users alert on WARN-and-above and an expected + * configuration choice shouldn't trip those alerts. + */ + fun start() { + entries = discoverEntries() + when (dlqMode) { + AxoniqConsoleDlqMode.FULL -> logger.info( + "Axoniq Platform DLQ inspection initialised in FULL mode — payloads, causes and diagnostics are exposed verbatim.") + AxoniqConsoleDlqMode.LIMITED -> logger.info( + "Axoniq Platform DLQ inspection initialised in LIMITED mode — payloads are hidden; diagnostics are filtered through whitelist {} (empty whitelist removes all diagnostic entries).", + dlqDiagnosticsWhitelist) + AxoniqConsoleDlqMode.MASKED -> logger.info( + "Axoniq Platform DLQ inspection initialised in MASKED mode — sequence ids are SHA-256 hashed; payloads, causes and diagnostics are not exposed. Operator delete/process actions still work via the hashed identifier.") + AxoniqConsoleDlqMode.NONE -> logger.info( + "Axoniq Platform DLQ inspection initialised in NONE mode — only sequence counts are exposed. List queries return empty results regardless of letter contents.") + } + } + + override fun infoFor(processorName: String): List = + dlqInfoForProcessor(processorName).map { + ProcessingGroupInfoSource.ProcessingGroupInfo(it.processingGroup, it.sequenceCount) + } + + + /** + * Internal view of a discovered DLQ together with all metadata required to address it through the public API. + * + * The [eventHandlingComponent] reference is captured during discovery so the sequence identifier of every + * letter can be derived from the same [EventHandlingComponent.sequenceIdentifierFor] the framework uses on + * enqueue. May be `null` if the component cannot be resolved from the configuration — in that case the + * manager falls back to the letter's own message id (documented in [sequenceIdentifierFor]). + */ + private data class DlqEntry( + val processingGroup: String, + val processorName: String, + val componentName: String, + val dlq: SequencedDeadLetterQueue, + val processor: SequencedDeadLetterProcessor, + val eventHandlingComponent: EventHandlingComponent?, + ) + + private val dlqNamePattern = + Regex("""^DeadLetterQueue\[EventHandlingComponent\[([^]]+)]\[(.+)]]$""") + + fun deadLetters( + processingGroup: String, + offset: Int = 0, + size: Int = 25, + // Capped at 10 to keep poll payloads small; long sequences are browsed via sequenceLetters(...). + maxSequenceLetters: Int = 10, + ): DeadLetterResponse { + val entry = dlqFor(processingGroup) + if (dlqMode == AxoniqConsoleDlqMode.NONE) { + return DeadLetterResponse(emptyList(), entry.dlq.amountOfSequences(null).join()) + } + val sequences = entry.dlq.deadLetters(null).join() + val pageOfSequences = sequences + .drop(offset) + .take(size) + .map { sequence -> + val letters = sequence.toList() + val rawSequenceId = letters.firstOrNull()?.let { sequenceIdentifierFor(entry, it) } ?: "" + val apiSequenceId = if (dlqMode == AxoniqConsoleDlqMode.MASKED) rawSequenceId.hashed() else rawSequenceId + letters + .take(maxSequenceLetters) + .map { it.toApiLetter(apiSequenceId) } + } + val total = entry.dlq.amountOfSequences(null).join() + return DeadLetterResponse(pageOfSequences, total) + } + + fun sequenceSize(processingGroup: String, sequenceIdentifier: String): Long { + val entry = dlqFor(processingGroup) + return findSequence(entry, sequenceIdentifier)?.count()?.toLong() ?: 0L + } + + /** + * Returns a paginated slice of letters belonging to the sequence identified by [sequenceIdentifier]. + * Used by the platform UI's detail modal so very long sequences can be browsed without loading + * them all up-front through the [deadLetters] batch query. + */ + fun lettersForSequence( + processingGroup: String, + sequenceIdentifier: String, + offset: Int, + size: Int, + ): SequenceLettersResponse { + val entry = dlqFor(processingGroup) + if (dlqMode == AxoniqConsoleDlqMode.NONE) { + return SequenceLettersResponse(emptyList(), 0) + } + val sequence = findSequence(entry, sequenceIdentifier) + ?: return SequenceLettersResponse(emptyList(), 0) + val total = sequence.size.toLong() + val safeOffset = offset.coerceAtLeast(0) + val safeSize = size.coerceAtLeast(1) + val slice = sequence + .drop(safeOffset) + .take(safeSize) + .map { it.toApiLetter(sequenceIdentifier) } + return SequenceLettersResponse(slice, total) + } + + /** + * Evicts every letter belonging to the sequence identified by [sequenceIdentifier]. + * + * @return the number of letters that were actually evicted (0 when the id no longer resolves — e.g. the + * operator's view was stale). + */ + fun delete(processingGroup: String, sequenceIdentifier: String): Int { + val entry = dlqFor(processingGroup) + val sequence = findSequence(entry, sequenceIdentifier) + if (sequence == null) { + logger.warn( + "DLQ delete-sequence: no sequence in [{}] matches id [{}] — nothing to evict", + processingGroup, sequenceIdentifier, + ) + return 0 + } + logger.info( + "DLQ delete-sequence: evicting {} letters from sequence [{}] in [{}]", + sequence.size, sequenceIdentifier, processingGroup, + ) + var evicted = 0 + sequence.forEach { + entry.dlq.evict(it, null).join() + evicted++ + } + return evicted + } + + /** + * Evicts a single letter identified by [messageIdentifier] from the sequence identified by + * [sequenceIdentifier]. Returns `true` when an eviction was performed; `false` indicates the + * sequence id or message id no longer resolves (typically because the caller's view was stale). + */ + fun delete(processingGroup: String, sequenceIdentifier: String, messageIdentifier: String): Boolean { + val entry = dlqFor(processingGroup) + val sequence = findSequence(entry, sequenceIdentifier) + if (sequence == null) { + logger.warn( + "DLQ delete-letter: no sequence in [{}] matches id [{}] (message id was [{}]) — caller view likely stale", + processingGroup, sequenceIdentifier, messageIdentifier, + ) + return false + } + val target = sequence.firstOrNull { it.message().identifier() == messageIdentifier } + if (target == null) { + logger.warn( + "DLQ delete-letter: sequence [{}] in [{}] (size={}) does not contain message id [{}] — already evicted?", + sequenceIdentifier, processingGroup, sequence.size, messageIdentifier, + ) + return false + } + logger.info( + "DLQ delete-letter: evicting message [{}] from sequence [{}] in [{}]", + messageIdentifier, sequenceIdentifier, processingGroup, + ) + entry.dlq.evict(target, null).join() + return true + } + + /** + * Resolves a DLQ sequence by the identifier this manager exposes through the API. Walks every sequence + * in the queue, derives each sequence's identifier via [sequenceIdentifierFor], and matches. + * + * When [dlqMode] is [AxoniqConsoleDlqMode.MASKED] the API-side identifier is a SHA-256 hash, so the + * lookup compares the hash of each candidate id against the supplied [sequenceIdentifier] — this keeps + * the delete/process operations working even when the operator only sees masked ids. + */ + private fun findSequence( + entry: DlqEntry, + sequenceIdentifier: String, + ): List>? { + val sequences = entry.dlq.deadLetters(null).join() + // Track candidates so we can log a diagnostic when nothing matches — the most common cause + // is a stale identifier (the sequence's first letter has changed) or a mode mismatch between + // the value the UI cached and what the manager now computes. Capped to keep log noise low. + val candidates = mutableListOf() + for (sequence in sequences) { + val letters = sequence.toList() + val firstLetter = letters.firstOrNull() ?: continue + val rawId = sequenceIdentifierFor(entry, firstLetter) + val candidateId = if (dlqMode == AxoniqConsoleDlqMode.MASKED) rawId.hashed() else rawId + if (candidateId == sequenceIdentifier) { + return letters + } + if (candidates.size < 5) candidates.add(candidateId) + } + logger.warn( + "DLQ findSequence: no sequence in [{}] matches id [{}] (dlqMode={}, scanned {} sequence(s), first {} candidate id(s): {}). Operator's view may be stale, or the first letter of the sequence has changed since the list query.", + entry.processingGroup, + sequenceIdentifier, + dlqMode, + candidates.size, + candidates.size, + candidates, + ) + return null + } + + fun process(processingGroup: String, messageIdentifier: String): Boolean { + val processor = dlqFor(processingGroup).processor + return processor.process { it.message().identifier() == messageIdentifier } + .get(60, TimeUnit.SECONDS) + } + + fun processAll( + processingGroup: String, + maxMessages: Int? = null, + timeoutSeconds: Long = 600, + ): Int { + val processor = dlqFor(processingGroup).processor + var processed = 0 + val deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds) + while (maxMessages == null || processed < maxMessages) { + if (System.nanoTime() > deadline) break + val didProcess = processor.process { true }.get(timeoutSeconds, TimeUnit.SECONDS) + if (!didProcess) break + processed++ + } + return processed + } + + fun deleteAll(processingGroup: String, timeoutSeconds: Long = 600): Int { + val dlq = dlqFor(processingGroup).dlq + val totalCount = dlq.size(null).get(timeoutSeconds, TimeUnit.SECONDS).toInt() + dlq.clear(null).get(timeoutSeconds, TimeUnit.SECONDS) + return totalCount + } + + /** + * Returns the DLQ entries belonging to the given processor — used by [ProcessorReportCreator] to surface DLQ size + * per processing group in the processor report. + */ + fun dlqInfoForProcessor(processorName: String): List = + discover() + .filter { it.processorName == processorName } + .map { DlqInfo(it.processingGroup, it.dlq.amountOfSequences(null).join()) } + + private fun dlqFor(processingGroup: String): DlqEntry = + discover().firstOrNull { it.processingGroup == processingGroup } + ?: throw IllegalArgumentException( + "There is no dead-letter queue for processing group [$processingGroup]") + + @Suppress("UNCHECKED_CAST") + private fun discoverEntries(): List { + data class Parsed( + val module: Configuration, + val processor: String, + val component: String, + val dlq: SequencedDeadLetterQueue, + ) + + val parsed = configuration.moduleConfigurations.flatMap { module -> + module.getComponents(SequencedDeadLetterQueue::class.java) + .mapNotNull { (name, dlq) -> + val match = dlqNamePattern.find(name) ?: return@mapNotNull null + Parsed( + module = module, + processor = match.groupValues[1], + component = match.groupValues[2], + dlq = dlq as SequencedDeadLetterQueue, + ) + } + } + val perProcessor = parsed.groupingBy { it.processor }.eachCount() + return parsed.map { + val ehcName = "EventHandlingComponent[${it.processor}][${it.component}]" + val processor = it.module + .getOptionalComponent(EventHandlingComponent::class.java, ehcName) + .map { ehc -> + if(ehc is AxoniqPlatformEventHandlingComponent) { + ehc.delegate as? SequencedDeadLetterProcessor<*> + } else null + } + .orElseThrow { + IllegalStateException( + "Component [$ehcName] is not wrapped with dead-letter processing") + } as SequencedDeadLetterProcessor + // The EHC is needed for sequence-identifier resolution. Looking it up here (once per discovery + // run) keeps the hot path on deadLetters/lettersForSequence cheap and matches the "discover once" + // shape of the rest of this manager. + val ehc = it.module + .getOptionalComponent(EventHandlingComponent::class.java, ehcName) + .orElse(null) + DlqEntry( + processingGroup = if (perProcessor[it.processor] == 1) it.processor else "${it.processor}::${it.component}", + processorName = it.processor, + componentName = it.component, + dlq = it.dlq, + processor = processor, + eventHandlingComponent = ehc, + ) + } + } + + private fun discover(): List = entries ?: discoverEntries().also { entries = it } + + /** + * Resolves the sequence identifier for a letter by spinning up a real + * [org.axonframework.messaging.core.unitofwork.UnitOfWork] and calling + * [EventHandlingComponent.sequenceIdentifierFor] with its [ProcessingContext]. The UoW gives the + * decorator chain (including `SequenceCachingEventHandlingComponent`) a non-null context to read + * resources from, matching the framework's own invariants and avoiding the NPE that calling with + * `null` would trigger. The UoW does no real work — the lambda completes synchronously on the + * direct executor, so there's no scheduling cost. Result shape mirrors the AF4 implementation: + * - String results are used verbatim; + * - non-String results fall back to `hashCode().toString()`; + * - if the EHC reference could not be captured at discovery time, or sequence resolution throws + * or returns `null`, the letter's message identifier is used so each letter still has a + * unique id. + */ + private fun sequenceIdentifierFor( + entry: DlqEntry, + letter: DeadLetter, + ): String { + val message = letter.message() + val ehc = entry.eventHandlingComponent ?: return message.identifier() + val raw: Any? = try { + unitOfWorkFactory.create().executeWithResult { context: ProcessingContext -> + CompletableFuture.completedFuture(ehc.sequenceIdentifierFor(message, context)) + }.join() + } catch (ex: Exception) { + logger.debug( + "Sequence identifier resolution threw for message [{}] in [{}] — falling back to message id.", + message.identifier(), entry.processingGroup, ex, + ) + null + } + return when (raw) { + null -> message.identifier() + is String -> raw + else -> raw.hashCode().toString() + } + } + + private fun DeadLetter.toApiLetter(sequenceIdentifier: String): ApiDeadLetter { + val message = this.message() + // `sequenceIdentifier` is expected to be in its final API form (hashed in MASKED, raw in FULL/ + // LIMITED). Hashing is the caller's responsibility — see `deadLetters(...)`. + return when (dlqMode) { + AxoniqConsoleDlqMode.NONE -> error( + "DLQ in NONE mode must not serialise letters — short-circuit in deadLetters/lettersForSequence was bypassed.") + AxoniqConsoleDlqMode.MASKED -> ApiDeadLetter( + messageIdentifier = message.identifier(), + message = MASKED, + messageType = messageTypeOf(message), + causeType = this.cause().map { it.type() }.orElse(null), + causeMessage = this.cause().map { MASKED }.orElse(null), + enqueuedAt = this.enqueuedAt(), + lastTouched = this.lastTouched(), + diagnostics = emptyMap(), + sequenceIdentifier = sequenceIdentifier, + ) + AxoniqConsoleDlqMode.LIMITED -> ApiDeadLetter( + messageIdentifier = message.identifier(), + message = LIMITED, + messageType = messageTypeOf(message), + causeType = this.cause().map { it.type() }.orElse(null), + causeMessage = this.cause().map { LIMITED }.orElse(null), + enqueuedAt = this.enqueuedAt(), + lastTouched = this.lastTouched(), + diagnostics = this.diagnostics().filteredByWhitelist(), + sequenceIdentifier = sequenceIdentifier, + ) + AxoniqConsoleDlqMode.FULL -> ApiDeadLetter( + messageIdentifier = message.identifier(), + message = serializePayload(message), + messageType = messageTypeOf(message), + causeType = this.cause().map { it.type() }.orElse(null), + causeMessage = this.cause().map { it.message() }.orElse(null), + enqueuedAt = this.enqueuedAt(), + lastTouched = this.lastTouched(), + diagnostics = this.diagnostics(), + sequenceIdentifier = sequenceIdentifier, + ) + } + } + + /** + * Best-effort human-readable type name for the message. In AF5 the qualified name carried on the + * message's [org.axonframework.messaging.core.MessageType] is the primary type identifier and is + * always set; the payload class is only a fallback for the unlikely case the type lookup throws. + */ + private fun messageTypeOf(message: EventMessage): String = + runCatching { message.type().name() }.getOrDefault(message.payloadType().name) + + private fun serializePayload(message: EventMessage): String { + val raw: String = try { + when (val payload = message.payload()) { + null -> "" + is ByteArray -> String(payload, Charsets.UTF_8) + is String -> payload + else -> payload.toString() + } + } catch (_: Exception) { + "" + } + // UTF-8-safe truncation so multi-byte characters can't get split mid-codepoint. + return raw.toByteArray(Charsets.UTF_8) + .let { if (it.size <= LETTER_PAYLOAD_SIZE_LIMIT) raw else String(it, 0, LETTER_PAYLOAD_SIZE_LIMIT, Charsets.UTF_8) } + } + + /** + * Applies the whitelist filter used in LIMITED mode. Returns only entries whose key is in the + * configured whitelist; an empty whitelist removes all diagnostics. + */ + private fun Metadata.filteredByWhitelist(): Map = + if (dlqDiagnosticsWhitelist.isEmpty()) emptyMap() + else subset(*dlqDiagnosticsWhitelist.toTypedArray()) + + private fun String.hashed(): String = DigestUtils.sha256Hex(this) + + /** + * Lightweight DTO returned to [ProcessorReportCreator] so it can populate per-processor DLQ size information + * without exposing the full dead-letter API. + */ + data class DlqInfo(val processingGroup: String, val sequenceCount: Long) +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessingGroupInfoSource.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessingGroupInfoSource.kt new file mode 100644 index 00000000..13ec7c89 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessingGroupInfoSource.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +/** + * Always-loadable seam used by [ProcessorReportCreator] to learn about processing groups (and their DLQ size, if + * any) belonging to a processor. + * + * Carrying this contract in a class with no references to the optional `axoniq-dead-letter` types lets + * [ProcessorReportCreator] stay free of those types so it can run on classpaths where the addon is absent. + */ +interface ProcessingGroupInfoSource { + + fun infoFor(processorName: String): List + + data class ProcessingGroupInfo( + val processingGroup: String, + val dlqSize: Long?, + ) +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreator.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreator.kt index 494ebf2c..9c88c68f 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreator.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreator.kt @@ -16,6 +16,7 @@ package io.axoniq.platform.framework.eventprocessor +import io.axoniq.platform.framework.api.ProcessingGroupStatus import io.axoniq.platform.framework.api.ProcessorMode import io.axoniq.platform.framework.api.ProcessorStatus import io.axoniq.platform.framework.api.ProcessorStatusReport @@ -30,9 +31,14 @@ import org.axonframework.messaging.eventhandling.processing.streaming.pooled.Poo import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.EventTrackerStatus import org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessor +import org.slf4j.LoggerFactory class ProcessorReportCreator(private val processingConfig: Configuration) { + private val logger = LoggerFactory.getLogger(ProcessorReportCreator::class.java) private val metricsRegistry = processingConfig.getComponent(ProcessorMetricsRegistry::class.java) + // Optional — only present when an addon (currently only axoniq-dead-letter) registers a source. + private val processingGroupInfoSource: ProcessingGroupInfoSource? = + processingConfig.getOptionalComponent(ProcessingGroupInfoSource::class.java).orElse(null) companion object { const val MULTI_TENANT_PROCESSOR_CLASS = "org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor" } @@ -51,7 +57,7 @@ class ProcessorReportCreator(private val processingConfig: Configuration) { private fun streamingStatus(name: String, processor: StreamingEventProcessor) = ProcessorStatus( name, - emptyList(), + processingGroupsFor(name), processor.tokenStoreIdentifier, processor.toType(), processor.isRunning, @@ -61,6 +67,18 @@ class ProcessorReportCreator(private val processingConfig: Configuration) { processor.processingStatus().map { (_, segment) -> segment.toStatus(name) }, ) + private fun processingGroupsFor(processorName: String): List { + val source = processingGroupInfoSource ?: return emptyList() + return try { + source.infoFor(processorName) + .map { ProcessingGroupStatus(it.processingGroup, it.dlqSize) } + } catch (e: Exception) { + // A failing probe must not break processor reporting. + logger.warn("Failed to collect processing group information for processor [{}]", processorName, e) + emptyList() + } + } + private fun subscribingStatus(name: String, processor: SubscribingEventProcessor) = ProcessorStatus( name, diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponder.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponder.kt new file mode 100644 index 00000000..c0f31172 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponder.kt @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.platform.framework.api.DeadLetterProcessRequest +import io.axoniq.platform.framework.api.DeadLetterRequest +import io.axoniq.platform.framework.api.DeadLetterResponse +import io.axoniq.platform.framework.api.DeadLetterSequenceDeleteRequest +import io.axoniq.platform.framework.api.DeadLetterSequenceSize +import io.axoniq.platform.framework.api.DeadLetterSingleDeleteRequest +import io.axoniq.platform.framework.api.DeleteAllDeadLetterSequencesRequest +import io.axoniq.platform.framework.api.FetchSequenceLettersRequest +import io.axoniq.platform.framework.api.ProcessAllDeadLetterSequencesRequest +import io.axoniq.platform.framework.api.Routes +import io.axoniq.platform.framework.api.SequenceLettersResponse +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import org.slf4j.LoggerFactory + +open class RSocketDlqResponder( + private val deadLetterManager: DeadLetterManager, + private val registrar: RSocketHandlerRegistrar, +) { + private val logger = LoggerFactory.getLogger(this::class.java) + + fun start() { + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.LETTERS, + DeadLetterRequest::class.java, + this::handleDeadLetterQuery, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.SEQUENCE_SIZE, + DeadLetterSequenceSize::class.java, + this::handleSequenceSizeQuery, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.SEQUENCE_LETTERS, + FetchSequenceLettersRequest::class.java, + this::handleSequenceLettersQuery, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.DELETE_SEQUENCE, + DeadLetterSequenceDeleteRequest::class.java, + this::handleDeleteSequenceCommand, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.DELETE_LETTER, + DeadLetterSingleDeleteRequest::class.java, + this::handleDeleteLetterCommand, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.PROCESS, + DeadLetterProcessRequest::class.java, + this::handleProcessCommand, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.PROCESS_ALL_SEQUENCES, + ProcessAllDeadLetterSequencesRequest::class.java, + this::handleProcessAllSequencesCommand, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.DELETE_ALL_SEQUENCES, + DeleteAllDeadLetterSequencesRequest::class.java, + this::handleDeleteAllSequencesCommand, + ) + } + + private fun handleDeadLetterQuery(request: DeadLetterRequest): DeadLetterResponse { + logger.debug("Handling Axoniq Platform DEAD_LETTERS query [{}]", request) + return deadLetterManager.deadLetters( + request.processingGroup, + request.offset, + request.size, + request.maxSequenceLetters, + ) + } + + private fun handleSequenceSizeQuery(request: DeadLetterSequenceSize): Long { + logger.debug( + "Handling Axoniq Platform DEAD_LETTER_SEQUENCE_SIZE query for processing group [{}]", + request.processingGroup, + ) + return deadLetterManager.sequenceSize(request.processingGroup, request.sequenceIdentifier) + } + + private fun handleSequenceLettersQuery(request: FetchSequenceLettersRequest): SequenceLettersResponse { + logger.debug( + "Handling Axoniq Platform DEAD_LETTER_SEQUENCE_LETTERS query for processing group [{}], sequence [{}], offset={}, size={}", + request.processingGroup, + request.sequenceIdentifier, + request.offset, + request.size, + ) + return deadLetterManager.lettersForSequence( + request.processingGroup, + request.sequenceIdentifier, + request.offset, + request.size, + ) + } + + private fun handleDeleteSequenceCommand(request: DeadLetterSequenceDeleteRequest) { + logger.debug( + "Handling Axoniq Platform DELETE_SEQUENCE command for processing group [{}], sequence [{}]", + request.processingGroup, request.sequenceIdentifier, + ) + val evicted = deadLetterManager.delete(request.processingGroup, request.sequenceIdentifier) + logger.info( + "DELETE_SEQUENCE for [{}] sequence [{}] → evicted {} letter(s)", + request.processingGroup, request.sequenceIdentifier, evicted, + ) + } + + private fun handleDeleteLetterCommand(request: DeadLetterSingleDeleteRequest) { + logger.debug( + "Handling Axoniq Platform DELETE_LETTER command for processing group [{}], sequence [{}], message [{}]", + request.processingGroup, request.sequenceIdentifier, request.messageIdentifier, + ) + val evicted = deadLetterManager.delete( + request.processingGroup, + request.sequenceIdentifier, + request.messageIdentifier, + ) + logger.info( + "DELETE_LETTER for [{}] sequence [{}] message [{}] → {}", + request.processingGroup, request.sequenceIdentifier, request.messageIdentifier, + if (evicted) "evicted" else "no-op (id no longer resolves)", + ) + } + + private fun handleProcessCommand(request: DeadLetterProcessRequest): Boolean { + logger.debug( + "Handling Axoniq Platform PROCESS command for processing group [{}]", + request.processingGroup, + ) + return deadLetterManager.process(request.processingGroup, request.messageIdentifier) + } + + private fun handleProcessAllSequencesCommand(request: ProcessAllDeadLetterSequencesRequest): Int { + logger.debug( + "Handling Axoniq Platform PROCESS_ALL_SEQUENCES command for processing group [{}]", + request.processingGroup, + ) + return deadLetterManager.processAll(request.processingGroup, request.maxMessages) + } + + private fun handleDeleteAllSequencesCommand(request: DeleteAllDeadLetterSequencesRequest): Int { + logger.debug( + "Handling Axoniq Platform DELETE_ALL_SEQUENCES command for processing group [{}]", + request.processingGroup, + ) + return deadLetterManager.deleteAll(request.processingGroup) + } +} diff --git a/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer b/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer index 7dd9a2cb..d4cb1715 100644 --- a/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer +++ b/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer @@ -18,4 +18,5 @@ io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer io.axoniq.platform.framework.messaging.distributed.AxoniqPlatformDistributedMessagingConfigurerEnhancer io.axoniq.platform.framework.modelling.AxoniqPlatformModellingConfigurationEnhancer io.axoniq.platform.framework.eventsourcing.AxoniqPlatformEventsourcingConfigurerEnhancer -io.axoniq.platform.framework.eventsourcing.AxoniqPlatformModelInspectionEnhancer \ No newline at end of file +io.axoniq.platform.framework.eventsourcing.AxoniqPlatformModelInspectionEnhancer +io.axoniq.platform.framework.eventprocessor.AxoniqPlatformDeadLetterConfigurerEnhancer \ No newline at end of file diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancerTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancerTest.kt new file mode 100644 index 00000000..a2305d14 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancerTest.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.platform.framework.AxoniqPlatformConfiguration +import io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.common.configuration.ComponentRegistry +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +/** + * Verifies the guards in [AxoniqPlatformDeadLetterConfigurerEnhancer]: the DLQ components must + * register only when the host application is wired with the platform client + * ([AxoniqPlatformConfiguration] present) and the addon hasn't already been registered. + * + * The classpath probe is intentionally NOT covered here — `axoniq-dead-letter` is on the test + * classpath via `provided` scope, so [Class.forName] always succeeds in tests. Exercising the + * absent branch would require classloader trickery that adds more risk than coverage. + */ +class AxoniqPlatformDeadLetterConfigurerEnhancerTest { + + private val enhancer = AxoniqPlatformDeadLetterConfigurerEnhancer() + + @Test + fun `registers all three components when AxoniqPlatformConfiguration is present and neither DLQ component is registered yet`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(AxoniqPlatformConfiguration::class.java) } returns true + every { registry.hasComponent(DeadLetterManager::class.java) } returns false + every { registry.hasComponent(ProcessingGroupInfoSource::class.java) } returns false + + enhancer.enhance(registry) + + // DeadLetterManager + ProcessingGroupInfoSource + RSocketDlqResponder + verify(exactly = 3) { registry.registerComponent(any>()) } + } + + @Test + fun `no-op when AxoniqPlatformConfiguration is absent — host is not a platform client`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(AxoniqPlatformConfiguration::class.java) } returns false + + enhancer.enhance(registry) + + verify(exactly = 0) { registry.registerComponent(any>()) } + } + + @Test + fun `idempotent — no registrations when DeadLetterManager is already registered`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(AxoniqPlatformConfiguration::class.java) } returns true + every { registry.hasComponent(DeadLetterManager::class.java) } returns true + every { registry.hasComponent(ProcessingGroupInfoSource::class.java) } returns false + + enhancer.enhance(registry) + + verify(exactly = 0) { registry.registerComponent(any>()) } + } + + @Test + fun `idempotent — no registrations when ProcessingGroupInfoSource is already registered`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(AxoniqPlatformConfiguration::class.java) } returns true + every { registry.hasComponent(DeadLetterManager::class.java) } returns false + every { registry.hasComponent(ProcessingGroupInfoSource::class.java) } returns true + + enhancer.enhance(registry) + + verify(exactly = 0) { registry.registerComponent(any>()) } + } + + // Note: the "Spring-path" branch in `register(...)` that skips re-registering + // ProcessingGroupInfoSource when it's already exposed by the Spring-backed registry is not + // reachable on the current code path — the top-level guard above bails out if EITHER + // DeadLetterManager or ProcessingGroupInfoSource is already registered. The pair of + // idempotency tests above therefore cover both flags of that combined guard. If the + // top-level guard is ever loosened, this branch would need its own dedicated test. + + @Test + fun `order is PLATFORM_ENHANCER_ORDER + 1 so the platform client components are visible`() { + // RSocketDlqResponder needs RSocketHandlerRegistrar at start-time; that component is + // registered by the main platform enhancer, so this enhancer must run after it. + assertEquals(AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER + 1, enhancer.order()) + } +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/DeadLetterManagerTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/DeadLetterManagerTest.kt new file mode 100644 index 00000000..434b3b6d --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/DeadLetterManagerTest.kt @@ -0,0 +1,786 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.framework.messaging.deadletter.Cause +import io.axoniq.framework.messaging.deadletter.DeadLetter +import io.axoniq.framework.messaging.deadletter.SequencedDeadLetterProcessor +import io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue +import io.axoniq.platform.framework.api.AxoniqConsoleDlqMode +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.apache.commons.codec.digest.DigestUtils +import org.axonframework.common.configuration.Configuration +import org.axonframework.messaging.core.Metadata +import org.axonframework.messaging.core.MessageType +import org.axonframework.messaging.eventhandling.EventHandlingComponent +import org.axonframework.messaging.eventhandling.EventMessage +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import java.time.Instant +import java.util.Optional +import java.util.concurrent.CompletableFuture + +/** + * Unit tests for [DeadLetterManager]. Behaviour is grouped under [Nested] inner classes so each + * area (discovery, sequence-identifier resolution, pagination, mutations, payload handling, DLQ + * modes, error paths) is easy to scan and runs in isolation. + * + * The realistic end-to-end flow lives in [RSocketDlqResponderIntegrationTest], which boots a real + * AF5 configuration with a Pooled Streaming processor and a deliberately-failing event handler. + * These unit tests intentionally cover mockable edge cases the integration test cannot easily + * exercise (negative pagination arguments, ByteArray payload-type fallback, every DlqMode). + */ +class DeadLetterManagerTest { + + @Nested + inner class Discovery { + + @Test + fun `exposes processor name as processing group when the processor has a single DLQ`() { + val dlq = fakeDlq() + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ) + + val infos = manager.infoFor("orders") + + assertEquals(listOf("orders"), infos.map { it.processingGroup }) + } + + @Test + fun `exposes processor__component identifier when a processor has multiple DLQs`() { + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(), + "DeadLetterQueue[EventHandlingComponent[orders][AuditProjector]]" to fakeDlq(), + ) + + val infos = manager.infoFor("orders") + + assertEquals( + setOf("orders::OrderProjector", "orders::AuditProjector"), + infos.map { it.processingGroup }.toSet(), + ) + } + + @Test + fun `ignores components whose names do not match the DLQ pattern`() { + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(), + "SomeOtherComponent" to fakeDlq(), + "DeadLetterQueue[Other][format]" to fakeDlq(), + ) + + assertEquals(listOf("orders"), manager.infoFor("orders").map { it.processingGroup }) + } + + @Test + fun `infoFor returns only DLQs belonging to the requested processor`() { + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequenceCount = 3), + "DeadLetterQueue[EventHandlingComponent[shipping][ShipmentProjector]]" to fakeDlq(sequenceCount = 7), + ) + + val ordersInfo = manager.infoFor("orders") + val shippingInfo = manager.infoFor("shipping") + + assertEquals(listOf("orders" to 3L), ordersInfo.map { it.processingGroup to it.dlqSize }) + assertEquals(listOf("shipping" to 7L), shippingInfo.map { it.processingGroup to it.dlqSize }) + } + } + + @Nested + inner class SequenceIdentifiers { + + @Test + fun `String result from policy is used verbatim as the sequence identifier`() { + val ehc = ehcWithPolicy { _ -> "saga-42" } + val sequence = listOf(fakeLetter("m1"), fakeLetter("m2")) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + val response = manager.deadLetters("orders") + + assertEquals(listOf("saga-42", "saga-42"), response.sequences[0].map { it.sequenceIdentifier }) + } + + @Test + fun `non-String result from policy is reduced to hashCode toString`() { + val payloadObject = 12345 + val ehc = ehcWithPolicy { _ -> payloadObject } + val sequence = listOf(fakeLetter("m1")) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + val response = manager.deadLetters("orders") + + assertEquals(payloadObject.hashCode().toString(), response.sequences[0][0].sequenceIdentifier) + } + + @Test + fun `null result from policy falls back to message identifier`() { + val ehc = ehcWithPolicy { _ -> null } + val sequence = listOf(fakeLetter("m1")) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + val response = manager.deadLetters("orders") + + assertEquals("m1", response.sequences[0][0].sequenceIdentifier) + } + + @Test + fun `policy is invoked once per sequence (using the first letter) and the id is stamped across the sequence`() { + val ehc = ehcWithPolicy { event -> event.identifier() } + val sequence = listOf(fakeLetter("m1"), fakeLetter("m2"), fakeLetter("m3")) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + val response = manager.deadLetters("orders") + + assertEquals(listOf("m1", "m1", "m1"), response.sequences[0].map { it.sequenceIdentifier }) + } + + @Test + fun `when no EventHandlingComponent is registered the manager falls back to the letter's message id`() { + val sequence = listOf(fakeLetter("only-letter")) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = null, + ) + + val response = manager.deadLetters("orders") + + assertEquals("only-letter", response.sequences[0][0].sequenceIdentifier) + } + + @Test + fun `empty sequence yields an empty letter list without crashing`() { + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(emptyList())), + ) + + val response = manager.deadLetters("orders") + + assertEquals(1, response.sequences.size) + assertTrue(response.sequences[0].isEmpty()) + } + } + + @Nested + inner class Pagination { + + @Test + fun `lettersForSequence returns the requested slice in order with the correct total`() { + val sequence = (1..5).map { fakeLetter(messageId = "m$it", payload = "p$it") } + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + val response = manager.lettersForSequence("orders", "m1", offset = 1, size = 2) + + assertEquals(5L, response.totalCount) + assertEquals(listOf("m2", "m3"), response.letters.map { it.messageIdentifier }) + } + + @Test + fun `lettersForSequence coerces a negative offset to zero`() { + val sequence = (1..3).map { fakeLetter("m$it") } + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + val response = manager.lettersForSequence("orders", "m1", offset = -5, size = 2) + + assertEquals(listOf("m1", "m2"), response.letters.map { it.messageIdentifier }) + } + + @Test + fun `lettersForSequence coerces a non-positive size to one`() { + val sequence = (1..3).map { fakeLetter("m$it") } + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + val response = manager.lettersForSequence("orders", "m1", offset = 0, size = 0) + + assertEquals(1, response.letters.size) + assertEquals("m1", response.letters[0].messageIdentifier) + } + + @Test + fun `lettersForSequence returns an empty response when no sequence matches the id`() { + val sequence = listOf(fakeLetter("real")) + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + val response = manager.lettersForSequence("orders", "stale-id", 0, 10) + + assertTrue(response.letters.isEmpty()) + assertEquals(0L, response.totalCount) + } + + @Test + fun `lettersForSequence caps the slice at the size argument even when the sequence is larger`() { + val sequence = (1..10).map { fakeLetter("m$it") } + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + val response = manager.lettersForSequence("orders", "m1", offset = 0, size = 3) + + assertEquals(3, response.letters.size) + assertEquals(10L, response.totalCount) + } + } + + @Nested + inner class Mutations { + + @Test + fun `delete by sequence evicts every letter in that sequence`() { + val letters = listOf(fakeLetter("m1"), fakeLetter("m2"), fakeLetter("m3")) + val dlq = fakeDlq(sequences = listOf(letters)) + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ehc = ehc, + ) + + val evicted = manager.delete("orders", "m1") + + assertEquals(3, evicted) + letters.forEach { verify(exactly = 1) { dlq.evict(it, null) } } + } + + @Test + fun `delete by sequence is a no-op when the sequence does not exist`() { + val dlq = fakeDlq(sequences = listOf(listOf(fakeLetter("real")))) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ) + + val evicted = manager.delete("orders", "ghost") + + assertEquals(0, evicted) + verify(exactly = 0) { dlq.evict(any>(), any()) } + } + + @Test + fun `delete by message evicts only the matching letter`() { + val letter1 = fakeLetter("m1") + val letter2 = fakeLetter("m2") + val dlq = fakeDlq(sequences = listOf(listOf(letter1, letter2))) + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ehc = ehc, + ) + + val evicted = manager.delete("orders", "m1", "m2") + + assertTrue(evicted) + verify(exactly = 1) { dlq.evict(letter2, null) } + verify(exactly = 0) { dlq.evict(letter1, null) } + } + + @Test + fun `delete by message is a no-op when the message id is unknown in the sequence`() { + val letter1 = fakeLetter("m1") + val dlq = fakeDlq(sequences = listOf(listOf(letter1))) + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ehc = ehc, + ) + + val evicted = manager.delete("orders", "m1", "missing") + + assertFalse(evicted) + verify(exactly = 0) { dlq.evict(any>(), any()) } + } + + @Test + fun `delete by message is a no-op when the sequence does not resolve`() { + val dlq = fakeDlq(sequences = listOf(listOf(fakeLetter("real")))) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ) + + val evicted = manager.delete("orders", "ghost", "anything") + + assertFalse(evicted) + verify(exactly = 0) { dlq.evict(any>(), any()) } + } + + @Test + fun `deleteAll returns the queue size and clears the queue`() { + val dlq = fakeDlq(totalSize = 42L) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ) + + val deleted = manager.deleteAll("orders") + + assertEquals(42, deleted) + verify(exactly = 1) { dlq.clear(null) } + } + + @Test + fun `sequenceSize returns the count of letters for the matching id`() { + val sequence = (1..4).map { fakeLetter("m$it") } + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + ) + + assertEquals(4L, manager.sequenceSize("orders", "m1")) + } + + @Test + fun `sequenceSize returns zero when the id does not resolve`() { + val sequence = listOf(fakeLetter("real")) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ) + + assertEquals(0L, manager.sequenceSize("orders", "ghost")) + } + } + + @Nested + inner class PayloadHandling { + + @Test + fun `payload at or below 1024 UTF-8 bytes is returned untouched`() { + val payload = "x".repeat(1024) + val sequence = listOf(fakeLetter("m1", payload = payload)) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ) + + val response = manager.deadLetters("orders") + + assertEquals(payload, response.sequences[0][0].message) + } + + @Test + fun `payload over 1024 UTF-8 bytes is truncated without splitting a multi-byte codepoint`() { + val char = "č" + val payload = char.repeat(600) // 600 * 2 = 1200 bytes + val sequence = listOf(fakeLetter("m1", payload = payload)) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ) + + val result = manager.deadLetters("orders").sequences[0][0].message + + assertTrue(result.toByteArray(Charsets.UTF_8).size <= 1024) + assertFalse(result.contains('�')) + assertTrue(result.all { it == 'č' }) + } + + @Test + fun `messageType uses the qualified name carried on the message type (primary in AF5)`() { + val message = fakeEventMessage( + id = "m1", + payload = "still serialised".toByteArray(), + payloadType = ByteArray::class.java, + typeName = "com.example.OrderPlaced", + ) + val letter = fakeLetterFromMessage(message) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(listOf(letter))), + ) + + val apiLetter = manager.deadLetters("orders").sequences[0][0] + + assertEquals("com.example.OrderPlaced", apiLetter.messageType) + } + + @Test + fun `messageType falls back to payload class fqn when the message type lookup throws`() { + val message = mockk(relaxed = true) + every { message.identifier() } returns "m1" + every { message.payload() } returns "hello" + every { message.payloadType() } returns String::class.java + every { message.type() } throws IllegalStateException("no type") + val letter = fakeLetterFromMessage(message) + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(listOf(letter))), + ) + + val apiLetter = manager.deadLetters("orders").sequences[0][0] + + assertEquals("java.lang.String", apiLetter.messageType) + } + } + + @Nested + inner class DlqModes { + + @Test + fun `NONE returns an empty list of sequences but still reports the total`() { + val sequence = listOf(fakeLetter("m1"), fakeLetter("m2")) + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + dlqMode = AxoniqConsoleDlqMode.NONE, + ) + + val response = manager.deadLetters("orders") + + assertTrue(response.sequences.isEmpty()) + assertEquals(1L, response.totalCount) + } + + @Test + fun `NONE returns an empty SequenceLettersResponse`() { + val sequence = listOf(fakeLetter("m1")) + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + dlqMode = AxoniqConsoleDlqMode.NONE, + ) + + val response = manager.lettersForSequence("orders", "m1", 0, 10) + + assertTrue(response.letters.isEmpty()) + assertEquals(0L, response.totalCount) + } + + @Test + fun `LIMITED strips payload and cause message, keeps sequence id unhashed`() { + val sequence = listOf(fakeLetter("m1", payload = "secret-payload")) + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + dlqMode = AxoniqConsoleDlqMode.LIMITED, + ) + + val letter = manager.deadLetters("orders").sequences[0][0] + + assertEquals("", letter.message) + assertEquals("", letter.causeMessage) + assertEquals("m1", letter.sequenceIdentifier) + } + + @Test + fun `LIMITED filters diagnostics down to the configured whitelist`() { + val whitelisted = mapOf("attempts" to "3", "cause" to "boom", "internal" to "shh") + val sequence = listOf(fakeLetter("m1", diagnostics = whitelisted)) + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + dlqMode = AxoniqConsoleDlqMode.LIMITED, + whitelist = listOf("attempts", "cause"), + ) + + val diagnostics = manager.deadLetters("orders").sequences[0][0].diagnostics + + assertEquals(setOf("attempts", "cause"), diagnostics.keys) + assertFalse(diagnostics.containsKey("internal")) + } + + @Test + fun `LIMITED with empty whitelist drops every diagnostic`() { + val sequence = listOf(fakeLetter("m1", diagnostics = mapOf("a" to "1", "b" to "2"))) + val ehc = ehcWithPolicy { event -> event.identifier() } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + dlqMode = AxoniqConsoleDlqMode.LIMITED, + ) + + val diagnostics = manager.deadLetters("orders").sequences[0][0].diagnostics + + assertTrue(diagnostics.isEmpty()) + } + + @Test + fun `MASKED returns MASKED markers and SHA-256 hashes the sequence id`() { + val sequence = listOf(fakeLetter("m1", payload = "secret")) + val ehc = ehcWithPolicy { _ -> "saga-123" } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + dlqMode = AxoniqConsoleDlqMode.MASKED, + ) + + val letter = manager.deadLetters("orders").sequences[0][0] + + assertEquals("", letter.message) + assertEquals("", letter.causeMessage) + assertEquals(DigestUtils.sha256Hex("saga-123"), letter.sequenceIdentifier) + assertNotEquals("saga-123", letter.sequenceIdentifier) + assertTrue(letter.diagnostics.isEmpty()) + } + + @Test + fun `MASKED still allows delete-by-sequence using the hashed id`() { + val letters = listOf(fakeLetter("m1"), fakeLetter("m2")) + val dlq = fakeDlq(sequences = listOf(letters)) + val ehc = ehcWithPolicy { _ -> "saga-xyz" } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ehc = ehc, + dlqMode = AxoniqConsoleDlqMode.MASKED, + ) + + val hashedId = DigestUtils.sha256Hex("saga-xyz") + val evicted = manager.delete("orders", hashedId) + + assertEquals(2, evicted) + } + + @Test + fun `MASKED lettersForSequence returns the paginated slice when looked up by the hashed id`() { + val letters = (1..5).map { fakeLetter("m$it", payload = "payload-$it") } + val dlq = fakeDlq(sequences = listOf(letters)) + val ehc = ehcWithPolicy { _ -> "saga-abc" } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ehc = ehc, + dlqMode = AxoniqConsoleDlqMode.MASKED, + ) + val hashedId = DigestUtils.sha256Hex("saga-abc") + + val response = manager.lettersForSequence("orders", hashedId, offset = 0, size = 3) + + assertEquals(5L, response.totalCount) + assertEquals(3, response.letters.size) + response.letters.forEach { assertEquals(hashedId, it.sequenceIdentifier) } + } + + @Test + fun `FULL preserves payload, cause message and raw sequence id`() { + val sequence = listOf(fakeLetter("m1", payload = "fully-visible")) + val ehc = ehcWithPolicy { _ -> "saga-99" } + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequences = listOf(sequence)), + ehc = ehc, + dlqMode = AxoniqConsoleDlqMode.FULL, + ) + + val letter = manager.deadLetters("orders").sequences[0][0] + + assertEquals("fully-visible", letter.message) + assertEquals("boom", letter.causeMessage) + assertEquals("saga-99", letter.sequenceIdentifier) + } + } + + @Nested + inner class Errors { + + @Test + fun `unknown processing group throws IllegalArgumentException`() { + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(), + ) + + assertThrows(IllegalArgumentException::class.java) { + manager.sequenceSize("unknown-group", "whatever") + } + } + + @Test + fun `NONE mode never reaches toApiLetter — bypassing the short-circuit throws IllegalStateException`() { + val manager = managerWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(), + dlqMode = AxoniqConsoleDlqMode.NONE, + ) + val toApiLetter = DeadLetterManager::class.java.declaredMethods + .single { it.name == "toApiLetter" } + .apply { isAccessible = true } + val letter = fakeLetter("m1") + + val thrown = assertThrows(java.lang.reflect.InvocationTargetException::class.java) { + toApiLetter.invoke(manager, letter, "any-id") + } + assertTrue(thrown.cause is IllegalStateException) + } + } + + // --------------------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------------------- + + /** + * Builds a manager backed by a synthetic configuration that exposes the given DLQs and matching + * processors / event-handling components. Pass [ehc] = `null` (the default) to use a delegate + * whose `sequenceIdentifierFor` returns null — exercises the manager's policy-null → message-id + * fallback path. + */ + private fun managerWith( + vararg dlqsByName: Pair>, + ehc: EventHandlingComponent? = null, + dlqMode: AxoniqConsoleDlqMode = AxoniqConsoleDlqMode.FULL, + whitelist: List = emptyList(), + ): DeadLetterManager { + val configuration = configurationWith(dlqsByName.asList(), ehc) + return DeadLetterManager(configuration, dlqMode, whitelist).also { it.start() } + } + + private fun configurationWith( + dlqsByName: List>>, + ehc: EventHandlingComponent?, + ): Configuration { + val module = mockk(relaxed = true) + every { module.getComponents(SequencedDeadLetterQueue::class.java) } returns + dlqsByName.toMap().mapValues { it.value as SequencedDeadLetterQueue<*> } + dlqsByName.forEach { (name, _) -> + val match = Regex("""^DeadLetterQueue\[EventHandlingComponent\[([^]]+)]\[(.+)]]$""").find(name) + if (match != null) { + val ehcName = "EventHandlingComponent[${match.groupValues[1]}][${match.groupValues[2]}]" + + val delegate = ehc ?: defaultDelegateMock() + val wrapper = AxoniqPlatformEventHandlingComponent( + delegate, + match.groupValues[1], + mockk(relaxed = true), + mockk(relaxed = true), + ) + every { + module.getOptionalComponent(EventHandlingComponent::class.java, ehcName) + } returns Optional.of(wrapper) + } + } + val root = mockk(relaxed = true) + every { root.moduleConfigurations } returns listOf(module) + return root + } + + private fun defaultDelegateMock(): EventHandlingComponent { + val mock = mockk( + moreInterfaces = arrayOf(SequencedDeadLetterProcessor::class), + relaxed = true, + ) + @Suppress("UNCHECKED_CAST") + every { mock.sequenceIdentifierFor(any(), any()) } answers { null as Any } + return mock + } + + private fun ehcWithPolicy(policy: (EventMessage) -> Any?): EventHandlingComponent { + val ehc = mockk( + moreInterfaces = arrayOf(SequencedDeadLetterProcessor::class), + relaxed = true, + ) + + @Suppress("UNCHECKED_CAST") + every { ehc.sequenceIdentifierFor(any(), any()) } answers { + policy(firstArg()) as Any + } + return ehc + } + + private fun fakeDlq( + sequences: List>> = emptyList(), + sequenceCount: Long = sequences.size.toLong(), + totalSize: Long = sequences.sumOf { it.size.toLong() }, + ): SequencedDeadLetterQueue { + val dlq = mockk>(relaxed = true) + every { dlq.deadLetters(null) } returns CompletableFuture.completedFuture( + sequences as Iterable>>, + ) + every { dlq.amountOfSequences(null) } returns CompletableFuture.completedFuture(sequenceCount) + every { dlq.size(null) } returns CompletableFuture.completedFuture(totalSize) + every { dlq.clear(null) } returns CompletableFuture.completedFuture(null) + every { dlq.evict(any>(), null) } returns CompletableFuture.completedFuture(null) + return dlq + } + + private fun fakeLetter( + messageId: String, + payload: Any? = "payload-$messageId", + payloadType: Class<*> = (payload?.javaClass ?: String::class.java), + causeType: String? = "java.lang.RuntimeException", + causeMessage: String? = "boom", + diagnostics: Map = emptyMap(), + ): DeadLetter { + val message = fakeEventMessage(messageId, payload, payloadType) + return fakeLetterFromMessage(message, causeType, causeMessage, diagnostics) + } + + private fun fakeLetterFromMessage( + message: EventMessage, + causeType: String? = "java.lang.RuntimeException", + causeMessage: String? = "boom", + diagnostics: Map = emptyMap(), + ): DeadLetter { + val letter = mockk>(relaxed = true) + every { letter.message() } returns message + val cause: Optional = if (causeType == null) Optional.empty() else { + val c = mockk() + every { c.type() } returns causeType + every { c.message() } returns (causeMessage ?: "") + Optional.of(c) + } + every { letter.cause() } returns cause + every { letter.enqueuedAt() } returns Instant.EPOCH + every { letter.lastTouched() } returns Instant.EPOCH + every { letter.diagnostics() } returns + if (diagnostics.isEmpty()) Metadata.emptyInstance() else Metadata.from(diagnostics) + return letter + } + + private fun fakeEventMessage( + id: String, + payload: Any?, + payloadType: Class<*>, + typeName: String = "com.example.${payloadType.simpleName ?: "Anonymous"}", + ): EventMessage { + val message = mockk(relaxed = true) + every { message.identifier() } returns id + every { message.payload() } returns payload + every { message.payloadType() } returns payloadType + val type = mockk() + every { type.name() } returns typeName + every { message.type() } returns type + return message + } +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreatorTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreatorTest.kt new file mode 100644 index 00000000..9f6ecf21 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreatorTest.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.platform.framework.api.ProcessingGroupStatus +import io.mockk.every +import io.mockk.mockk +import org.axonframework.common.configuration.Configuration +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import java.util.Optional + +/** + * Focused unit tests for the [ProcessingGroupInfoSource] integration in [ProcessorReportCreator]. + * The full `createReport()` flow needs a live AF5 configuration with processors and segment + * tracker statuses — that's integration territory and not what we're testing here. Instead we + * reach into the private `processingGroupsFor` via reflection so we can exercise the optional + * source seam in isolation. + * + * Reflection is the right tool here because the production class deliberately keeps this + * method private (it's a callee of `streamingStatus(...)` only), and widening visibility just + * for tests would leak the seam into the public surface. + */ +class ProcessorReportCreatorTest { + + @Test + fun `processingGroupsFor returns empty when no ProcessingGroupInfoSource is registered`() { + val config = baseConfigurationWith(infoSource = Optional.empty()) + val creator = ProcessorReportCreator(config) + + val result = invokeProcessingGroupsFor(creator, "orders") + + assertEquals(emptyList(), result) + } + + @Test + fun `processingGroupsFor maps source infos to ProcessingGroupStatus entries`() { + val source = mockk() + every { source.infoFor("orders") } returns listOf( + ProcessingGroupInfoSource.ProcessingGroupInfo("orders", 3L), + ProcessingGroupInfoSource.ProcessingGroupInfo("orders::audit", 0L), + ) + val config = baseConfigurationWith(infoSource = Optional.of(source)) + val creator = ProcessorReportCreator(config) + + val result = invokeProcessingGroupsFor(creator, "orders") + + assertEquals( + listOf( + ProcessingGroupStatus("orders", 3L), + ProcessingGroupStatus("orders::audit", 0L), + ), + result, + ) + } + + @Test + fun `processingGroupsFor swallows source exceptions and returns empty list`() { + // A failing probe must not break processor reporting — the warning is logged but the + // caller receives an empty list and the rest of the report still renders. + val source = mockk() + every { source.infoFor("orders") } throws RuntimeException("boom") + val config = baseConfigurationWith(infoSource = Optional.of(source)) + val creator = ProcessorReportCreator(config) + + val result = invokeProcessingGroupsFor(creator, "orders") + + assertEquals(emptyList(), result) + } + + /** + * The constructor calls `getComponent(ProcessorMetricsRegistry::class.java)` and + * `getOptionalComponent(ProcessingGroupInfoSource::class.java)`. We provide just enough of + * each so construction succeeds; the metrics registry isn't exercised by the test path + * (no segments => no metrics lookups), so a relaxed mock is fine. + */ + private fun baseConfigurationWith(infoSource: Optional): Configuration { + val config = mockk(relaxed = true) + every { config.getComponent(ProcessorMetricsRegistry::class.java) } returns ProcessorMetricsRegistry() + every { config.getOptionalComponent(ProcessingGroupInfoSource::class.java) } returns infoSource + return config + } + + @Suppress("UNCHECKED_CAST") + private fun invokeProcessingGroupsFor(creator: ProcessorReportCreator, processorName: String): List { + val method = ProcessorReportCreator::class.java.getDeclaredMethod("processingGroupsFor", String::class.java) + method.isAccessible = true + return method.invoke(creator, processorName) as List + } +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponderIntegrationTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponderIntegrationTest.kt new file mode 100644 index 00000000..d24369eb --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponderIntegrationTest.kt @@ -0,0 +1,277 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.framework.messaging.deadletter.GenericDeadLetter +import io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue +import io.axoniq.framework.messaging.eventhandling.deadletter.DeadLetterQueueConfiguration +import io.axoniq.platform.framework.api.AxoniqConsoleDlqMode +import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry +import io.mockk.mockk +import org.axonframework.common.configuration.AxonConfiguration +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.common.configuration.DecoratorDefinition +import org.axonframework.eventsourcing.configuration.EventSourcingConfigurer +import org.axonframework.messaging.core.MessageType +import org.axonframework.messaging.core.QualifiedName +import org.axonframework.messaging.core.sequencing.SequencingPolicy +import org.axonframework.messaging.eventhandling.EventHandlingComponent +import org.axonframework.messaging.eventhandling.EventMessage +import org.axonframework.messaging.eventhandling.GenericEventMessage +import org.axonframework.messaging.eventhandling.SimpleEventHandlingComponent +import org.axonframework.messaging.eventhandling.processing.streaming.pooled.PooledStreamingEventProcessorConfiguration +import org.axonframework.messaging.eventhandling.processing.streaming.pooled.PooledStreamingEventProcessorModule +import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.SequenceOverridingEventHandlingComponent +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.util.Optional + +/** + * End-to-end integration test for the AF5 DLQ inspection wiring. + * + * Boots a real AF5 configuration (via [EventSourcingConfigurer]) with a single Pooled Streaming + * event processor whose [SimpleEventHandlingComponent] is wrapped in a + * [SequenceOverridingEventHandlingComponent] using a deterministic [SequencingPolicy]. With the + * DLQ enabled on the processor, the framework materialises a real [SequencedDeadLetterQueue] + * under the `DeadLetterQueue[EventHandlingComponent[][]]` name that + * [DeadLetterManager] discovers. + * + * Rather than driving the processor end-to-end (which requires an event store, an event sink, and + * deterministic failure timing), we enqueue letters directly onto the materialised DLQ — the bit + * the test is verifying is the discovery + sequence-identifier resolution + mutation flow, not the + * processor's own machinery. + * + * What we assert: + * - the manager exposes the processor name as a single processing group; + * - `deadLetters(...)` returns the policy-derived sequence identifier (NOT the synthetic first + * message id, which is the bug the AF4 sequencing policy port fixed); + * - the sequence id stays stable across `delete(seq, messageId)` — deleting the first letter + * must not rename the sequence; + * - `lettersForSequence(...)` paginates correctly; + * - `delete(seq)` evicts every letter. + * + * DLQ-mode behaviour is intentionally out of scope here — that's covered exhaustively in the + * unit-test [DeadLetterManagerTest]'s `DlqModes` nested class, which can mock cheaply. + */ +class RSocketDlqResponderIntegrationTest { + + companion object { + private const val PROCESSOR_NAME = "audit" + private const val COMPONENT_NAME = "AuditProjector" + private const val SEQUENCE_ID = "tenant-42" + private val EVENT_NAME = QualifiedName(TestEvent::class.java) + } + + /** + * Payload type used by the integration test. A real AF5 `EventMessage` requires a `MessageType` + * which we derive from this class via [QualifiedName]. + */ + data class TestEvent(val value: String) + + private lateinit var configuration: AxonConfiguration + private lateinit var manager: DeadLetterManager + + @BeforeEach + fun boot() { + configuration = EventSourcingConfigurer.create() + .componentRegistry { registry -> + registry.registerComponent(ComponentDefinition.ofType(DeadLetterManager::class.java) + // FULL exposure so the assertions on payload + sequence id can run. The + // production default is now NONE (see AxoniqPlatformConfiguration#dlqMode). + .withBuilder { c -> DeadLetterManager(c, AxoniqConsoleDlqMode.FULL) }) + registry.registerComponent(ComponentDefinition.ofType(HandlerMetricsRegistry::class.java) + .withBuilder { mockk(relaxed = true) }) + registry.registerComponent(ComponentDefinition.ofType(ProcessorMetricsRegistry::class.java) + .withBuilder { ProcessorMetricsRegistry() }) + } + .messaging { messaging -> + messaging.eventProcessing { eventProcessing -> + eventProcessing.pooledStreaming { psep -> + psep.processor( + PooledStreamingEventProcessorModule(PROCESSOR_NAME) + .eventHandlingComponents { components -> + components.declarative(COMPONENT_NAME) { _ -> + SequenceOverridingEventHandlingComponent( + constantSequencingPolicy(), + SimpleEventHandlingComponent.create(COMPONENT_NAME), + ) + } + } + .customized { _, cfg -> + cfg.extend( + DeadLetterQueueConfiguration::class.java, + ) { DeadLetterQueueConfiguration().enabled() } + } + .componentRegistry { moduleRegistry -> + moduleRegistry.registerDecorator( + DecoratorDefinition.forType(EventHandlingComponent::class.java) + .with { cc, _, delegate -> + AxoniqPlatformEventHandlingComponent( + delegate, + PROCESSOR_NAME, + cc.getComponent(HandlerMetricsRegistry::class.java), + cc.getComponent(ProcessorMetricsRegistry::class.java), + ) + } + .order(0), + ) + }, + ) + } + } + } + .build() + configuration.start() + manager = configuration.getComponent(DeadLetterManager::class.java) + manager.start() + // Push two letters with the same policy-derived sequence id so we can test pagination and + // confirm the id remains stable as letters get evicted. + val dlq = resolveDlq() + enqueue(dlq, sequenceId = SEQUENCE_ID, "letter-1") + enqueue(dlq, sequenceId = SEQUENCE_ID, "letter-2") + enqueue(dlq, sequenceId = SEQUENCE_ID, "letter-3") + } + + @AfterEach + fun shutdown() { + configuration.shutdown() + } + + @Test + fun `manager discovers the configured processor as a processing group`() { + val infos = manager.infoFor(PROCESSOR_NAME) + assertEquals(1, infos.size) + assertEquals(PROCESSOR_NAME, infos[0].processingGroup) + } + + @Test + fun `the framework wraps the EHC in a caching decorator and the manager resolves through it`() { + // Sanity check that exercises the live decorator chain: AF5 wraps every EHC with + // SequenceCachingEventHandlingComponent whose sequenceIdentifierFor reads a per-event + // resource off a ProcessingContext. The manager spins up a real UnitOfWork on every + // sequence-id resolution so the caching decorator gets the non-null context it requires. + // If this regresses, the four flow tests below also fail — this test fails first with a + // clearer signal because it bypasses pagination. + val expectedEhcName = "EventHandlingComponent[$PROCESSOR_NAME][$COMPONENT_NAME]" + val ehc = configuration.moduleConfigurations.asSequence() + .mapNotNull { module -> + module.getOptionalComponent( + org.axonframework.messaging.eventhandling.EventHandlingComponent::class.java, + expectedEhcName, + ).orElse(null) + } + .firstOrNull() + assertNotNull(ehc, "Expected an EventHandlingComponent registered as [$expectedEhcName]") + // Drive the resolution through the public API rather than poking at the manager's + // internals — that's the contract the rest of the production code relies on. + val firstLetterSequenceId = manager.deadLetters(PROCESSOR_NAME).sequences[0][0].sequenceIdentifier + assertEquals(SEQUENCE_ID, firstLetterSequenceId) + } + + @Test + fun `deadLetters returns the policy-derived sequence id (not the first letter's message id)`() { + val response = manager.deadLetters(PROCESSOR_NAME) + assertEquals(1, response.sequences.size) + val letters = response.sequences[0] + // Helpful failure context: surface the actual ids when the assertion fails so future + // breakage points at "got " vs "got " without re-running with a debugger. + val seqIds = letters.map { it.sequenceIdentifier } + assertTrue( + letters.all { it.sequenceIdentifier == SEQUENCE_ID }, + "Expected every letter in the sequence to carry policy id [$SEQUENCE_ID] but got $seqIds", + ) + assertTrue(letters.first().messageIdentifier != SEQUENCE_ID) + } + + @Test + fun `lettersForSequence paginates within the sequence`() { + val response = manager.lettersForSequence(PROCESSOR_NAME, SEQUENCE_ID, offset = 1, size = 1) + assertEquals(3L, response.totalCount) + assertEquals(1, response.letters.size) + } + + @Test + fun `sequence id stays stable across delete-letter (the AF4 regression that motivated this rewrite)`() { + // Capture the first letter's message id, delete it, then re-fetch the sequence: the + // sequence id MUST still be `tenant-42`, not the message id of the (now-second) letter. + val firstLetterId = manager.deadLetters(PROCESSOR_NAME).sequences[0][0].messageIdentifier + val evicted = manager.delete(PROCESSOR_NAME, SEQUENCE_ID, firstLetterId) + assertTrue(evicted) + val after = manager.deadLetters(PROCESSOR_NAME).sequences[0] + assertEquals(2, after.size) + assertTrue(after.all { it.sequenceIdentifier == SEQUENCE_ID }) + assertTrue(after.none { it.messageIdentifier == firstLetterId }) + } + + @Test + fun `delete-sequence evicts every letter in the sequence`() { + val evicted = manager.delete(PROCESSOR_NAME, SEQUENCE_ID) + assertEquals(3, evicted) + // After deletion the manager's infoFor returns zero sequences for this processor. + assertEquals(0L, manager.infoFor(PROCESSOR_NAME).single().dlqSize) + } + + // ----------------------------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------------------------- + + /** + * Walks the module configurations the same way [DeadLetterManager.discoverEntries] does and + * returns the materialised DLQ for the test processor's component. Failing here means the + * framework wiring didn't produce a DLQ — likely a misconfigured + * [DeadLetterQueueConfiguration] in [boot]. + */ + @Suppress("UNCHECKED_CAST") + private fun resolveDlq(): SequencedDeadLetterQueue { + val expectedName = "DeadLetterQueue[EventHandlingComponent[$PROCESSOR_NAME][$COMPONENT_NAME]]" + val dlq = configuration.moduleConfigurations.asSequence() + .flatMap { it.getComponents(SequencedDeadLetterQueue::class.java).entries.asSequence() } + .firstOrNull { it.key == expectedName } + ?.value + assertNotNull(dlq, "Expected the framework to materialise a DLQ named [$expectedName]") + return dlq as SequencedDeadLetterQueue + } + + private fun enqueue( + dlq: SequencedDeadLetterQueue, + sequenceId: String, + payloadValue: String, + ) { + val message: EventMessage = GenericEventMessage( + MessageType(EVENT_NAME), + TestEvent(payloadValue), + ) + val letter = GenericDeadLetter( + sequenceId, + message, + RuntimeException("deliberate failure for [$payloadValue]"), + ) + dlq.enqueue(sequenceId, letter, null).join() + } + + /** + * Returns the same sequence identifier for every event in the test, so all enqueued letters + * end up in a single sequence — that's what lets the assertions about pagination and stable + * sequence id work. + */ + private fun constantSequencingPolicy(): SequencingPolicy = + SequencingPolicy { _, _ -> Optional.of(SEQUENCE_ID) } +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponderTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponderTest.kt new file mode 100644 index 00000000..e31b4028 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponderTest.kt @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.platform.framework.api.DeadLetterProcessRequest +import io.axoniq.platform.framework.api.DeadLetterRequest +import io.axoniq.platform.framework.api.DeadLetterResponse +import io.axoniq.platform.framework.api.DeadLetterSequenceDeleteRequest +import io.axoniq.platform.framework.api.DeadLetterSequenceSize +import io.axoniq.platform.framework.api.DeadLetterSingleDeleteRequest +import io.axoniq.platform.framework.api.DeleteAllDeadLetterSequencesRequest +import io.axoniq.platform.framework.api.FetchSequenceLettersRequest +import io.axoniq.platform.framework.api.ProcessAllDeadLetterSequencesRequest +import io.axoniq.platform.framework.api.Routes +import io.axoniq.platform.framework.api.SequenceLettersResponse +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.mockk.CapturingSlot +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.runs +import io.mockk.slot +import io.mockk.verify +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * Verifies that [RSocketDlqResponder] registers a handler for every DLQ route on + * [RSocketHandlerRegistrar.registerHandlerWithPayload] and that each captured handler delegates + * to the corresponding [DeadLetterManager] method. We capture the lambda handed to the registrar + * so we exercise both registration AND the handler's body in a single test per route. + * + * The slots are typed as `(T) -> Any` because that is the exact functional shape + * `registerHandlerWithPayload` declares; the production handlers happen to return more specific + * types but the registrar erases them down to `Any`. + */ +class RSocketDlqResponderTest { + + private lateinit var manager: DeadLetterManager + private lateinit var registrar: RSocketHandlerRegistrar + private lateinit var responder: RSocketDlqResponder + + private val letterHandler = slot<(DeadLetterRequest) -> Any>() + private val sequenceSizeHandler = slot<(DeadLetterSequenceSize) -> Any>() + private val sequenceLettersHandler = slot<(FetchSequenceLettersRequest) -> Any>() + private val deleteSequenceHandler = slot<(DeadLetterSequenceDeleteRequest) -> Any>() + private val deleteLetterHandler = slot<(DeadLetterSingleDeleteRequest) -> Any>() + private val processHandler = slot<(DeadLetterProcessRequest) -> Any>() + private val processAllHandler = slot<(ProcessAllDeadLetterSequencesRequest) -> Any>() + private val deleteAllHandler = slot<(DeleteAllDeadLetterSequencesRequest) -> Any>() + + @BeforeEach + fun setUp() { + manager = mockk(relaxed = true) + registrar = mockk(relaxed = true) + captureHandler(Routes.ProcessingGroup.DeadLetter.LETTERS, DeadLetterRequest::class.java, letterHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.SEQUENCE_SIZE, DeadLetterSequenceSize::class.java, sequenceSizeHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.SEQUENCE_LETTERS, FetchSequenceLettersRequest::class.java, sequenceLettersHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.DELETE_SEQUENCE, DeadLetterSequenceDeleteRequest::class.java, deleteSequenceHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.DELETE_LETTER, DeadLetterSingleDeleteRequest::class.java, deleteLetterHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.PROCESS, DeadLetterProcessRequest::class.java, processHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.PROCESS_ALL_SEQUENCES, ProcessAllDeadLetterSequencesRequest::class.java, processAllHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.DELETE_ALL_SEQUENCES, DeleteAllDeadLetterSequencesRequest::class.java, deleteAllHandler) + + responder = RSocketDlqResponder(manager, registrar) + responder.start() + } + + @Test + fun `start registers a handler for each of the eight DLQ routes`() { + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.LETTERS, DeadLetterRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.SEQUENCE_SIZE, DeadLetterSequenceSize::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.SEQUENCE_LETTERS, FetchSequenceLettersRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.DELETE_SEQUENCE, DeadLetterSequenceDeleteRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.DELETE_LETTER, DeadLetterSingleDeleteRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.PROCESS, DeadLetterProcessRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.PROCESS_ALL_SEQUENCES, ProcessAllDeadLetterSequencesRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.DELETE_ALL_SEQUENCES, DeleteAllDeadLetterSequencesRequest::class.java, any()) } + } + + @Test + fun `LETTERS handler forwards all request parameters to DeadLetterManager deadLetters`() { + val expected = DeadLetterResponse(emptyList(), 7) + every { manager.deadLetters("g", 1, 10, 5) } returns expected + + val result = letterHandler.captured(DeadLetterRequest("g", offset = 1, size = 10, maxSequenceLetters = 5)) + + assertEquals(expected, result) + verify(exactly = 1) { manager.deadLetters("g", 1, 10, 5) } + } + + @Test + fun `SEQUENCE_SIZE handler forwards processing group and sequence id`() { + every { manager.sequenceSize("g", "seq-1") } returns 42L + + val result = sequenceSizeHandler.captured(DeadLetterSequenceSize("g", "seq-1")) + + assertEquals(42L, result) + verify(exactly = 1) { manager.sequenceSize("g", "seq-1") } + } + + @Test + fun `SEQUENCE_LETTERS handler forwards pagination arguments`() { + val expected = SequenceLettersResponse(emptyList(), 0) + every { manager.lettersForSequence("g", "seq-1", 5, 25) } returns expected + + val result = sequenceLettersHandler.captured(FetchSequenceLettersRequest("g", "seq-1", offset = 5, size = 25)) + + assertEquals(expected, result) + verify(exactly = 1) { manager.lettersForSequence("g", "seq-1", 5, 25) } + } + + @Test + fun `DELETE_SEQUENCE handler delegates to DeadLetterManager delete-by-sequence`() { + every { manager.delete("g", "seq-1") } returns 3 + + deleteSequenceHandler.captured(DeadLetterSequenceDeleteRequest("g", "seq-1")) + + verify(exactly = 1) { manager.delete("g", "seq-1") } + } + + @Test + fun `DELETE_LETTER handler delegates to DeadLetterManager delete-by-message`() { + every { manager.delete("g", "seq-1", "msg-1") } returns true + + deleteLetterHandler.captured(DeadLetterSingleDeleteRequest("g", "seq-1", "msg-1")) + + verify(exactly = 1) { manager.delete("g", "seq-1", "msg-1") } + } + + @Test + fun `PROCESS handler returns the manager's result`() { + every { manager.process("g", "msg-1") } returns true + + val result = processHandler.captured(DeadLetterProcessRequest("g", "msg-1")) + + assertEquals(true, result) + verify(exactly = 1) { manager.process("g", "msg-1") } + } + + @Test + fun `PROCESS_ALL_SEQUENCES handler forwards maxMessages`() { + every { manager.processAll("g", 9) } returns 7 + + val result = processAllHandler.captured(ProcessAllDeadLetterSequencesRequest("g", maxMessages = 9)) + + assertEquals(7, result) + verify(exactly = 1) { manager.processAll("g", 9) } + } + + @Test + fun `DELETE_ALL_SEQUENCES handler returns the cleared count`() { + every { manager.deleteAll("g") } returns 11 + + val result = deleteAllHandler.captured(DeleteAllDeadLetterSequencesRequest("g")) + + assertEquals(11, result) + verify(exactly = 1) { manager.deleteAll("g") } + } + + private fun captureHandler(route: String, payloadType: Class, handler: CapturingSlot<(T) -> Any>) { + every { + registrar.registerHandlerWithPayload(route, payloadType, capture(handler)) + } just runs + } +} diff --git a/spring-boot-starter/src/main/java/io/axoniq/console/framework/starter/AxoniqPlatformAutoConfiguration.kt b/spring-boot-starter/src/main/java/io/axoniq/console/framework/starter/AxoniqPlatformAutoConfiguration.kt index 247044b5..af03c8e2 100644 --- a/spring-boot-starter/src/main/java/io/axoniq/console/framework/starter/AxoniqPlatformAutoConfiguration.kt +++ b/spring-boot-starter/src/main/java/io/axoniq/console/framework/starter/AxoniqPlatformAutoConfiguration.kt @@ -57,6 +57,8 @@ class AxoniqPlatformAutoConfiguration { .secure(properties.isSecure) .initialDelay(properties.initialDelay) .domainEventAccessMode(properties.domainEventAccessMode) + .dlqMode(properties.dlqMode) + .also { properties.dlqDiagnosticsWhitelist.forEach(it::addDlqDiagnosticsWhitelistKey) } } private fun getApplicationName(