diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt index 3ead64f..5f6d465 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt @@ -356,25 +356,28 @@ class DeadLetterManager @JvmOverloads constructor( } } val perProcessor = parsed.groupingBy { it.processor }.eachCount() - return parsed.map { + return parsed.mapNotNull { val ehcName = "EventHandlingComponent[${it.processor}][${it.component}]" - val processor = it.module - .getOptionalComponent(EventHandlingComponent::class.java, ehcName) - .map { ehc -> - if(ehc is AxoniqPlatformEventHandlingComponent) { - ehc.delegate as? SequencedDeadLetterProcessor<*> - } else null - } - .orElseThrow { - IllegalStateException( - "Component [$ehcName] is not wrapped with dead-letter processing") - } as SequencedDeadLetterProcessor - // The EHC is needed for sequence-identifier resolution. Looking it up here (once per discovery - // run) keeps the hot path on deadLetters/lettersForSequence cheap and matches the "discover once" - // shape of the rest of this manager. val ehc = it.module .getOptionalComponent(EventHandlingComponent::class.java, ehcName) .orElse(null) + val processor = ehc?.findDeadLetterProcessor() + ?: it.module + .getOptionalComponent(SequencedDeadLetterProcessor::class.java, ehcName) + .orElse(null) + ?.let { + @Suppress("UNCHECKED_CAST") + it as SequencedDeadLetterProcessor + } + if (processor == null) { + logger.warn( + "Skipping DLQ for component [{}] — could not resolve a SequencedDeadLetterProcessor " + + "from the registered EventHandlingComponent. Retry/process actions will be " + + "unavailable for this component; deadLetters and delete remain functional.", + ehcName + ) + return@mapNotNull null + } DlqEntry( processingGroup = if (perProcessor[it.processor] == 1) it.processor else "${it.processor}::${it.component}", processorName = it.processor, @@ -386,6 +389,25 @@ class DeadLetterManager @JvmOverloads constructor( } } + @Suppress("UNCHECKED_CAST") + private fun EventHandlingComponent.findDeadLetterProcessor(): SequencedDeadLetterProcessor? { + var current: Any? = this + val seen = mutableSetOf() + while (current != null && seen.add(current)) { + if (current is SequencedDeadLetterProcessor<*>) { + return current as SequencedDeadLetterProcessor + } + current = when (current) { + is AxoniqPlatformEventHandlingComponent -> current.delegate + else -> current.javaClass.declaredFields + .firstOrNull { EventHandlingComponent::class.java.isAssignableFrom(it.type) } + ?.also { it.isAccessible = true } + ?.get(current) + } + } + return null + } + private fun discover(): List = entries ?: discoverEntries().also { entries = it } /**