Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import com.datastax.oss.driver.api.core.DriverExecutionException;
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import com.datastax.oss.driver.shaded.guava.common.base.Throwables;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

Expand Down Expand Up @@ -164,6 +167,47 @@ public static <T> T getUninterruptibly(CompletionStage<T> stage) {
}
}

/**
* Get the result of a future uninterruptibly, with a timeout.
*
* @param stage the completion stage to wait for
* @param timeout the maximum time to wait
* @return the result value
* @throws DriverExecutionException if the future completed exceptionally
* @throws DriverExecutionException wrapping TimeoutException if the wait timed out
*/
public static <T> T getUninterruptibly(CompletionStage<T> stage, Duration timeout) {
boolean interrupted = false;
try {
long remainingNanos = timeout.toNanos();
long deadline = System.nanoTime() + remainingNanos;
while (true) {
try {
return stage.toCompletableFuture().get(remainingNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
interrupted = true;
remainingNanos = deadline - System.nanoTime();
if (remainingNanos <= 0) {
throw new DriverExecutionException(new TimeoutException("Timed out after interrupt"));
}
} catch (TimeoutException e) {
throw new DriverExecutionException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DriverException) {
throw ((DriverException) cause).copy();
}
Throwables.throwIfUnchecked(cause);
throw new DriverExecutionException(cause);
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

/**
* Executes a function on the calling thread and returns result in a {@link CompletableFuture}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package com.datastax.oss.driver.internal.core.util.concurrent;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.fail;

import com.datastax.oss.driver.api.core.DriverExecutionException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Test;

public class CompletableFuturesTest {
Expand All @@ -45,4 +49,28 @@ public void should_not_suppress_identical_exceptions() throws Exception {
assertThat(e.getCause()).isEqualTo(error);
}
}

@Test
public void should_get_uninterruptibly_with_timeout_on_completed_future() {
CompletableFuture<String> future = CompletableFuture.completedFuture("result");
String result = CompletableFutures.getUninterruptibly(future, Duration.ofSeconds(1));
assertThat(result).isEqualTo("result");
}

@Test
public void should_timeout_on_incomplete_future() {
CompletableFuture<String> future = new CompletableFuture<>();
assertThatThrownBy(() -> CompletableFutures.getUninterruptibly(future, Duration.ofMillis(100)))
.isInstanceOf(DriverExecutionException.class)
.hasCauseInstanceOf(TimeoutException.class);
}

@Test
public void should_propagate_exception_with_timeout() {
CompletableFuture<String> future = new CompletableFuture<>();
RuntimeException error = new RuntimeException("test error");
future.completeExceptionally(error);
assertThatThrownBy(() -> CompletableFutures.getUninterruptibly(future, Duration.ofSeconds(1)))
.isEqualTo(error);
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
<skipUnitTests>${skipTests}</skipUnitTests>
<release.autopublish>false</release.autopublish>
<pushChanges>false</pushChanges>
<mockitoopens.argline />
<mockitoopens.argline/>
</properties>
<dependencyManagement>
<dependencies>
Expand Down