Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,25 +356,28 @@ class DeadLetterManager @JvmOverloads constructor(
}
}
val perProcessor = parsed.groupingBy { it.processor }.eachCount()
return parsed.map {
return parsed.mapNotNull {
val ehcName = "EventHandlingComponent[${it.processor}][${it.component}]"
val processor = it.module
.getOptionalComponent(EventHandlingComponent::class.java, ehcName)
.map { ehc ->
if(ehc is AxoniqPlatformEventHandlingComponent) {
ehc.delegate as? SequencedDeadLetterProcessor<*>
} else null
}
.orElseThrow {
IllegalStateException(
"Component [$ehcName] is not wrapped with dead-letter processing")
} as SequencedDeadLetterProcessor<EventMessage>
// The EHC is needed for sequence-identifier resolution. Looking it up here (once per discovery
// run) keeps the hot path on deadLetters/lettersForSequence cheap and matches the "discover once"
// shape of the rest of this manager.
val ehc = it.module
.getOptionalComponent(EventHandlingComponent::class.java, ehcName)
.orElse(null)
val processor = ehc?.findDeadLetterProcessor()
?: it.module
.getOptionalComponent(SequencedDeadLetterProcessor::class.java, ehcName)
.orElse(null)
?.let {
@Suppress("UNCHECKED_CAST")
it as SequencedDeadLetterProcessor<EventMessage>
}
if (processor == null) {
logger.warn(
"Skipping DLQ for component [{}] — could not resolve a SequencedDeadLetterProcessor " +
"from the registered EventHandlingComponent. Retry/process actions will be " +
"unavailable for this component; deadLetters and delete remain functional.",
ehcName
)
return@mapNotNull null
}
DlqEntry(
processingGroup = if (perProcessor[it.processor] == 1) it.processor else "${it.processor}::${it.component}",
processorName = it.processor,
Expand All @@ -386,6 +389,25 @@ class DeadLetterManager @JvmOverloads constructor(
}
}

@Suppress("UNCHECKED_CAST")
private fun EventHandlingComponent.findDeadLetterProcessor(): SequencedDeadLetterProcessor<EventMessage>? {
var current: Any? = this
val seen = mutableSetOf<Any>()
while (current != null && seen.add(current)) {
if (current is SequencedDeadLetterProcessor<*>) {
return current as SequencedDeadLetterProcessor<EventMessage>
}
current = when (current) {
is AxoniqPlatformEventHandlingComponent -> current.delegate
else -> current.javaClass.declaredFields
.firstOrNull { EventHandlingComponent::class.java.isAssignableFrom(it.type) }
?.also { it.isAccessible = true }
?.get(current)
}
}
return null
}

private fun discover(): List<DlqEntry> = entries ?: discoverEntries().also { entries = it }

/**
Expand Down