-
Notifications
You must be signed in to change notification settings - Fork 71
perf(flagd): speed up e2e test execution via container pool and parallel scenarios #1752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cca921a
41f983c
551159a
f9e647c
32940e9
6b334f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| package dev.openfeature.contrib.providers.flagd.e2e; | ||
|
|
||
| import java.io.File; | ||
| import java.io.IOException; | ||
| import java.nio.file.Files; | ||
| import java.nio.file.Path; | ||
| import java.nio.file.Paths; | ||
| import java.time.Duration; | ||
| import org.apache.commons.io.FileUtils; | ||
| import org.apache.commons.lang3.RandomStringUtils; | ||
| import org.testcontainers.containers.ComposeContainer; | ||
| import org.testcontainers.containers.wait.strategy.Wait; | ||
|
|
||
| /** A single pre-warmed Docker Compose stack (flagd + envoy) and its associated temp directory. */ | ||
| public class ContainerEntry { | ||
|
|
||
| public static final int FORBIDDEN_PORT = 9212; | ||
|
|
||
| public final ComposeContainer container; | ||
| public final Path tempDir; | ||
|
|
||
| private ContainerEntry(ComposeContainer container, Path tempDir) { | ||
| this.container = container; | ||
| this.tempDir = tempDir; | ||
| } | ||
|
|
||
| /** Start a new container entry. Blocks until all services are ready. */ | ||
| public static ContainerEntry start() throws IOException { | ||
| Path tempDir = Files.createDirectories( | ||
| Paths.get("tmp/" + RandomStringUtils.randomAlphanumeric(8).toLowerCase() + "/")); | ||
|
|
||
| ComposeContainer container = new ComposeContainer(new File("test-harness/docker-compose.yaml")) | ||
| .withEnv("FLAGS_DIR", tempDir.toAbsolutePath().toString()) | ||
| .withExposedService("flagd", 8013, Wait.forListeningPort()) | ||
| .withExposedService("flagd", 8015, Wait.forListeningPort()) | ||
| .withExposedService("flagd", 8080, Wait.forListeningPort()) | ||
| .withExposedService("envoy", 9211, Wait.forListeningPort()) | ||
| .withExposedService("envoy", FORBIDDEN_PORT, Wait.forListeningPort()) | ||
| .withStartupTimeout(Duration.ofSeconds(45)); | ||
| container.start(); | ||
|
|
||
| return new ContainerEntry(container, tempDir); | ||
| } | ||
|
|
||
| /** Stop the container and clean up the temp directory. */ | ||
| public void stop() throws IOException { | ||
| container.stop(); | ||
| FileUtils.deleteDirectory(tempDir.toFile()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| package dev.openfeature.contrib.providers.flagd.e2e; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.concurrent.BlockingQueue; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| /** | ||
| * A pool of pre-warmed {@link ContainerEntry} instances. | ||
| * | ||
| * <p>All containers are started in parallel during {@link #initialize()}, paying the ~45s Docker | ||
| * Compose startup cost only once. Scenarios borrow a container via {@link #acquire()} and return | ||
| * it via {@link #release(ContainerEntry)} after teardown, allowing the next scenario to reuse it | ||
| * immediately without any cold-start overhead. | ||
| * | ||
| * <p>Pool size is controlled by the system property {@code flagd.e2e.pool.size} (default: 2). | ||
| * | ||
| * <p>Multiple test classes may share the same JVM fork (Surefire {@code reuseForks=true}). Each | ||
| * class calls {@link #initialize()} and {@link #shutdown()} once. A reference counter ensures | ||
| * that containers are only started on the first {@code initialize()} call and only stopped when | ||
| * the last {@code shutdown()} call is made, preventing one class from destroying containers that | ||
| * are still in use by another class running concurrently in the same JVM. | ||
| */ | ||
| @Slf4j | ||
| public class ContainerPool { | ||
|
|
||
| private static final int POOL_SIZE = Integer.getInteger( | ||
| "flagd.e2e.pool.size", Math.min(Runtime.getRuntime().availableProcessors(), 4)); | ||
|
|
||
| private static final BlockingQueue<ContainerEntry> pool = new LinkedBlockingQueue<>(); | ||
| private static final List<ContainerEntry> all = new ArrayList<>(); | ||
| private static final java.util.concurrent.atomic.AtomicInteger refCount = | ||
| new java.util.concurrent.atomic.AtomicInteger(0); | ||
|
|
||
| public static void initialize() throws Exception { | ||
| if (refCount.getAndIncrement() > 0) { | ||
| log.info("Container pool already initialized (refCount={}), reusing existing pool.", refCount.get()); | ||
| return; | ||
| } | ||
| log.info("Starting container pool of size {}...", POOL_SIZE); | ||
| ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE); | ||
| try { | ||
| List<Future<ContainerEntry>> futures = new ArrayList<>(); | ||
| for (int i = 0; i < POOL_SIZE; i++) { | ||
| futures.add(executor.submit(ContainerEntry::start)); | ||
| } | ||
| for (Future<ContainerEntry> future : futures) { | ||
| ContainerEntry entry = future.get(); | ||
| pool.add(entry); | ||
| all.add(entry); | ||
| } | ||
| } catch (Exception e) { | ||
| // Stop any containers that started successfully before the failure | ||
| all.forEach(entry -> { | ||
| try { | ||
| entry.stop(); | ||
| } catch (IOException suppressed) { | ||
| e.addSuppressed(suppressed); | ||
| } | ||
| }); | ||
| pool.clear(); | ||
| all.clear(); | ||
| refCount.decrementAndGet(); | ||
| throw e; | ||
| } finally { | ||
| executor.shutdown(); | ||
| } | ||
| log.info("Container pool ready ({} containers).", POOL_SIZE); | ||
| } | ||
|
|
||
| public static void shutdown() { | ||
| int remaining = refCount.decrementAndGet(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could it be possible that all current users call
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is happening on the beforeAll - per test suites, tests suites are not parallel anyways, there is another improvement for this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the test suites do not run in parallel, then we don't need this sync mechanism. If they do run in parallel, the scenrio in my comment could (even though it is unlikely) occur
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i ran it shortly in parallel, and the next iteration will add more flexibility to it, where all the tests are actually parallel. but this is in the next follow up pr. i shortly ran all 3 tests in parallel with some hacks, but it was not worth the effort. |
||
| if (remaining > 0) { | ||
| log.info("Container pool still in use by {} class(es), deferring shutdown.", remaining); | ||
| return; | ||
| } | ||
| log.info("Last shutdown call — stopping all containers."); | ||
| all.forEach(entry -> { | ||
| try { | ||
| entry.stop(); | ||
| } catch (IOException e) { | ||
| log.warn("Error stopping container entry", e); | ||
| } | ||
| }); | ||
| pool.clear(); | ||
| all.clear(); | ||
| } | ||
|
|
||
| /** | ||
| * Borrow a container from the pool, blocking until one becomes available. | ||
| * The caller MUST call {@link #release(ContainerEntry)} when done. | ||
| */ | ||
| public static ContainerEntry acquire() throws InterruptedException { | ||
| return pool.take(); | ||
| } | ||
|
|
||
| /** Return a container to the pool so the next scenario can use it. */ | ||
| public static void release(ContainerEntry entry) { | ||
| pool.add(entry); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,9 +60,18 @@ public void eventHandlerShouldBeExecutedWithin(String eventType, int ms) { | |
| .atMost(ms, MILLISECONDS) | ||
| .pollInterval(10, MILLISECONDS) | ||
| .until(() -> state.events.stream().anyMatch(event -> event.type.equals(eventType))); | ||
| state.lastEvent = state.events.stream() | ||
| .filter(event -> event.type.equals(eventType)) | ||
| .findFirst(); | ||
| state.events.clear(); | ||
| // Drain all events up to and including the first match. This ensures that | ||
| // older events (e.g. a READY from before a disconnect) cannot satisfy a | ||
| // later assertion that expects a *new* event of the same type, while still | ||
| // preserving events that arrived *after* the match for subsequent steps. | ||
| Event matched = null; | ||
| while (!state.events.isEmpty()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are w esure that no new events can be emitted while or immediately after this runs? Otherwise this loop might not be sufficient
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do you mean, we want to specifically keep events which are generated while this loop runs. because an ready event can happen shortly after a disconnect, and while we wait for the disconnect including cleanup, we might even remove the new ready
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are worried about events shortly after a disconnect, we should wait for some time and check events afterwards or in the meantime. This loop might be done after 0 or 1 iterations, and might be done before we receive such an event that we would want to wait for
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this cleans all the events which has happened till our matched event. so cleaning the list till out event. so that if there are events in the meantime, they stay in the list, and we can match in the next check against all of them. |
||
| Event head = state.events.poll(); | ||
| if (head != null && head.type.equals(eventType)) { | ||
| matched = head; | ||
| break; | ||
| } | ||
| } | ||
| state.lastEvent = java.util.Optional.ofNullable(matched); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The naming is not ideal, this is not the last event, it's the last event that matches the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the last event is for the current test state, the last tracked event. we do not care about the type. this is only used to verify some event information. |
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be a concurrent data structure too, so that we guarantee that all changes to the list are also visible to another thread calling shutdown