Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.modulith.events;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

Expand Down Expand Up @@ -73,6 +74,12 @@ default ApplicationEvent getApplicationEvent() {
*/
Optional<Instant> getCompletionDate();

/**
* Returns the list of failed attempts to publish the event
* @return will never be {@literal null}.
*/
List<FailedAttemptInfo> getFailedAttempts();

/**
* Returns whether the publication of the event has completed.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.springframework.modulith.events;

import java.time.Instant;

public interface FailedAttemptInfo {
/**
* Returns the time the event is published at.
*
* @return will never be {@literal null}.
*/
Instant getPublicationDate();

/**
* Returns the exception causing the publication to fail
*
* @return will never be {@literal null}.
*/
Throwable getFailureReason();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ interface Completable {
* @param instant must not be {@literal null}.
*/
void markCompleted(Instant instant);

/**
* Stores the reason why the publication failed
*
* @param instant must not be {@literal null}.
* @param exception must not be {@literal null}.
*/
void markFailed(Instant instant, Throwable exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@
package org.springframework.modulith.events.core;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import org.springframework.lang.Nullable;
import org.springframework.modulith.events.FailedAttemptInfo;
import org.springframework.util.Assert;

import static java.util.Collections.unmodifiableList;

/**
* Default {@link Completable} implementation.
*
Expand All @@ -36,6 +43,7 @@ class DefaultEventPublication implements TargetEventPublication {
private final Instant publicationDate;

private Optional<Instant> completionDate;
private List<FailedAttemptInfo> failedAttempts;

/**
* Creates a new {@link DefaultEventPublication} for the given event and {@link PublicationTargetIdentifier}.
Expand Down Expand Up @@ -100,6 +108,11 @@ public Optional<Instant> getCompletionDate() {
return completionDate;
}

@Override
public List<FailedAttemptInfo> getFailedAttempts() {
return failedAttempts == null ? List.of() : unmodifiableList(failedAttempts);
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.CompletableEventPublication#markCompleted(java.time.Instant)
Expand All @@ -109,6 +122,15 @@ public void markCompleted(Instant instant) {
this.completionDate = Optional.of(instant);
}


@Override
public void markFailed(Instant instant, Throwable exception) {
if (failedAttempts == null) {
failedAttempts = new ArrayList<>();
}
failedAttempts.add(new DefaultFailedAttemptInfo(instant, exception));
}

/*
* (non-Javadoc)
* @see java.lang.Object#toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void markCompleted(Object event, PublicationTargetIdentifier targetIdenti
* @see org.springframework.modulith.events.core.EventPublicationRegistry#markFailed(java.lang.Object, org.springframework.modulith.events.core.PublicationTargetIdentifier)
*/
@Override
public void markFailed(Object event, PublicationTargetIdentifier targetIdentifier) {
public void markFailed(Object event, PublicationTargetIdentifier targetIdentifier, Throwable o_O) {
inProgress.unregister(event, targetIdentifier);
}

Expand Down Expand Up @@ -266,14 +266,14 @@ PublicationsInProgress getPublicationsInProgress() {
* Marks the given {@link TargetEventPublication} as failed.
*
* @param publication must not be {@literal null}.
* @see #markFailed(Object, PublicationTargetIdentifier)
* @see #markFailed(Object, PublicationTargetIdentifier, Throwable)
* @since 1.3
*/
void markFailed(TargetEventPublication publication) {
void markFailed(TargetEventPublication publication, Throwable exception) {

Assert.notNull(publication, "TargetEventPublication must not be null!");

markFailed(publication.getEvent(), publication.getTargetIdentifier());
markFailed(publication.getEvent(), publication.getTargetIdentifier(), exception);
}

private static String getConfirmationMessage(Collection<?> publications) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.core;

import org.springframework.modulith.events.FailedAttemptInfo;

import java.time.Instant;

/**
* Default {@link FailedAttemptInfo} implementation.
* @param publicationDate - when the event failed to be published
* @param exception - the reason of publication failure
*/
record DefaultFailedAttemptInfo(Instant publicationDate, Throwable exception) implements FailedAttemptInfo {

@Override
public Instant getPublicationDate() {
return publicationDate;
}

@Override
public Throwable getFailureReason() {
return exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ public interface EventPublicationRegistry {
*
* @param event must not be {@literal null}.
* @param targetIdentifier must not be {@literal null}.
* @param exception cause of failing publication
* @since 1.3
*/
void markFailed(Object event, PublicationTargetIdentifier targetIdentifier);
void markFailed(Object event, PublicationTargetIdentifier targetIdentifier, Throwable exception);

/**
* Deletes all completed {@link TargetEventPublication}s that have been completed before the given {@link Duration}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ default void markCompleted(TargetEventPublication publication, Instant completio
*/
void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate);

/**
* Marks the publication for the given event and {@link PublicationTargetIdentifier} as failed.
*
* @param identifier must not be {@literal null}.
* @param exception cause of failing publication
* @since 1.3
*/
void markFailed(UUID identifier, Instant failedDate, Throwable exception);

/**
* Marks the publication with the given identifier completed at the given {@link Instant}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,15 @@ public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 10;
}

private void handleFailure(Method method, Object event, Throwable o_O) {
private void handleFailure(Method method, Object event, Throwable exception) {

markFailed(method, event);
markFailed(method, event, exception);

if (LOG.isDebugEnabled()) {
LOG.debug("Invocation of listener {} failed. Leaving event publication uncompleted.", method, o_O);
LOG.debug("Invocation of listener {} failed. Leaving event publication uncompleted.", method, exception);
} else {
LOG.info("Invocation of listener {} failed with message {}. Leaving event publication uncompleted.",
method, o_O.getMessage());
method, exception.getMessage());
}
}

Expand All @@ -224,12 +224,12 @@ private void markCompleted(Method method, Object event) {
registry.get().markCompleted(event, identifier);
}

private void markFailed(Method method, Object event) {
private void markFailed(Method method, Object event, Throwable exception) {

// Mark publication complete if the method is a transactional event listener.
// Mark publication failed if the method is a transactional event listener.
String adapterId = LISTENER_IDS.get(method);
PublicationTargetIdentifier identifier = PublicationTargetIdentifier.of(adapterId);
registry.get().markFailed(event, identifier);
registry.get().markFailed(event, identifier, exception);
}

@SuppressWarnings("null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ void removesFailingResubmissionFromInProgressPublications() {

var registry = createRegistry(Instant.now());
var identifier = PublicationTargetIdentifier.of("id");
var error = new IllegalArgumentException("some error");

var failedPublications = registry.store(new Object(), Stream.of(identifier)).stream()
.peek(registry::markFailed)
.peek(e -> registry.markFailed(e, error))
.toList();

// Failed completions are not present in the in progress ones
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
.ifPresent(it -> it.markCompleted(completionDate));
}

@Override
public void markFailed(UUID identifier, Instant failedDate, Throwable exception) {

publications.stream()
.filter(it -> it.getIdentifier().equals(identifier))
.findFirst()
.ifPresent(it -> it.markFailed(failedDate, exception));
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.core.EventPublicationRepository#markCompleted(java.util.UUID, java.time.Instant)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.junit.jupiter.api.Test;

import java.time.Instant;

/**
* @author Oliver Drotbohm
* @author Björn Kieling
Expand Down Expand Up @@ -50,6 +52,7 @@ void publicationIsIncompleteByDefault() {

assertThat(publication.isCompleted()).isFalse();
assertThat(publication.getCompletionDate()).isNotPresent();
assertThat(publication.getFailedAttempts()).isEmpty();
}

@Test // GH-1056
Expand All @@ -64,6 +67,24 @@ void isOnlyAssociatedWithTheVerySameEventInstance() {
assertThat(publication.isAssociatedWith(first, identifier)).isTrue();
assertThat(publication.isAssociatedWith(second, identifier)).isFalse();
}
@Test
void isFailedAttemptStored() {

var first = new SampleEvent("Foo");

var identifier = PublicationTargetIdentifier.of("id");
var publication = TargetEventPublication.of(first, identifier);

assertThat(publication.getFailedAttempts()).isEmpty();

Instant failedInstant = Instant.now();
IllegalStateException reason = new IllegalStateException("test");

publication.markFailed(failedInstant, reason);

assertThat(publication.getFailedAttempts())
.contains(new DefaultFailedAttemptInfo(failedInstant, reason));
}

record SampleEvent(String payload) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ void marksLazilyComputedCompletableFutureAsCompleted() throws Throwable {
verify(registry).markCompleted(any(), any());
}

@Test
void marksLazilyComputedCompletableFutureAsFailed() throws Throwable {

var result = createProxyFor(bean).asyncWithResult(true);

assertThat(result.isDone()).isFalse();
verify(registry, never()).markFailed(any(), any(), any());
verify(registry, never()).markCompleted(any(), any());

Thread.sleep(500);

assertThat(result.isCompletedExceptionally()).isTrue();
verify(registry).markFailed(any(), any(), any());
verify(registry, never()).markCompleted(any(), any());
}

@Test // GH-483
void exposesResultForCompletableFuture() throws Exception {

Expand Down
Loading