Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@
import com.powsybl.loadflow.LoadFlowProvider;
import com.powsybl.network.store.client.NetworkStoreService;
import com.powsybl.sensitivity.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.gridsuite.computation.dto.ReportInfos;
import org.gridsuite.computation.service.*;
import org.apache.commons.lang3.tuple.Pair;
import org.gridsuite.sensitivityanalysis.server.PropertyServerNameProvider;
import org.gridsuite.sensitivityanalysis.server.dto.SensitivityAnalysisInputData;
import org.gridsuite.sensitivityanalysis.server.dto.SensitivityAnalysisStatus;
import org.gridsuite.sensitivityanalysis.server.dto.parameters.SensitivityAnalysisParametersInfos;
import org.gridsuite.sensitivityanalysis.server.entities.AnalysisResultEntity;
import org.gridsuite.sensitivityanalysis.server.entities.ContingencyResultEntity;
import org.gridsuite.sensitivityanalysis.server.entities.SensitivityResultEntity;
import org.gridsuite.sensitivityanalysis.server.util.BatchAsyncPollerFactory;
import org.gridsuite.sensitivityanalysis.server.util.ScheduledThreadPoolFactory;
import org.gridsuite.sensitivityanalysis.server.util.SensitivityAnalysisRunnerSupplier;
import org.gridsuite.sensitivityanalysis.server.util.SensitivityResultWriterPersisted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.gridsuite.sensitivityanalysis.server.util.SensitivityResultPersistedWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
Expand All @@ -50,22 +51,20 @@
/**
* @author Franck Lecuyer <franck.lecuyer at rte-france.com>
*/
@Slf4j
@Service
public class SensitivityAnalysisWorkerService extends AbstractWorkerService<Boolean, SensitivityAnalysisRunContext, SensitivityAnalysisInputData, SensitivityAnalysisResultService> {
private static final Logger LOGGER = LoggerFactory.getLogger(SensitivityAnalysisWorkerService.class);
public static final String COMPUTATION_TYPE = "Sensitivity analysis";

public static final String COMPUTATION_TYPE = "Sensitivity analysis";
public static final int CONTINGENCY_RESULTS_BUFFER_SIZE = 128;

public static final int MAX_RESULTS_BUFFER_SIZE = 128;

protected final SensitivityAnalysisInMemoryObserver inMemoryObserver;
private final SensitivityAnalysisInputBuilderService sensitivityAnalysisInputBuilderService;

private final SensitivityAnalysisParametersService parametersService;

private final Function<String, SensitivityAnalysis.Runner> sensitivityAnalysisFactorySupplier;

protected final SensitivityAnalysisInMemoryObserver inMemoryObserver;
private final ScheduledThreadPoolFactory scheduledThreadPoolFactory;
private final BatchAsyncPollerFactory batchAsyncPollerFactory;

public SensitivityAnalysisWorkerService(NetworkStoreService networkStoreService,
ReportService reportService,
Expand All @@ -84,6 +83,8 @@ public SensitivityAnalysisWorkerService(NetworkStoreService networkStoreService,
this.parametersService = parametersService;
this.sensitivityAnalysisFactorySupplier = sensitivityAnalysisRunnerSupplier::getRunner;
this.inMemoryObserver = inMemoryObserver;
this.scheduledThreadPoolFactory = ScheduledThreadPoolFactory.getDefault();
this.batchAsyncPollerFactory = BatchAsyncPollerFactory.getDefault();
}

@Override
Expand Down Expand Up @@ -132,50 +133,21 @@ protected CompletableFuture<Boolean> getCompletableFuture(SensitivityAnalysisRun

saveSensitivityResults(groupedFactors, resultUuid, contingencies);

SensitivityResultWriterPersisted writer = new SensitivityResultWriterPersisted(resultUuid, resultService);
writer.start();

List<SensitivityFactor> factors = groupedFactors.stream().flatMap(Collection::stream).toList();
SensitivityFactorReader sensitivityFactorReader = new SensitivityFactorModelReader(factors, runContext.getNetwork());
CompletableFuture<Boolean> future = sensitivityAnalysisRunner.runAsync(
runContext.getNetwork(),
variantId,
sensitivityFactorReader,
writer,
contingencies,
runContext.getSensitivityAnalysisInputs().getVariablesSets(),
sensitivityAnalysisParameters,
executionService.getComputationManager(),
runContext.getReportNode())
.whenComplete((unused1, unused2) -> writer.setQueueProducerFinished())
.thenApply(unused -> {
try {
while (!writer.isConsumerFinished()) {
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
writer.interrupt();
// used to check if result is not null
return true;
})
.exceptionally(e -> {
LOGGER.error("Error occurred during computation", e);
runContext.getReportNode()
.newReportNode()
.withMessageTemplate("sensitivity.analysis.server.sensitivityComputationFailed")
.withUntypedValue("exception", e.getMessage())
.withSeverity(TypedValue.ERROR_SEVERITY)
.add();
writer.interrupt();
// null means it failed
return false;
});
if (resultUuid != null) {
futures.put(resultUuid, future);
}
return future;
SensitivityResultPersistedWriter sensitivityResultPersistedWriter = new SensitivityResultPersistedWriter(resultUuid, resultService, scheduledThreadPoolFactory, batchAsyncPollerFactory);

SensitivityAnalysisRunParameters runParameters = new SensitivityAnalysisRunParameters()
.setContingencies(contingencies)
.setVariableSets(runContext.getSensitivityAnalysisInputs().getVariablesSets())
.setParameters(sensitivityAnalysisParameters)
.setComputationManager(executionService.getComputationManager())
.setReportNode(runContext.getReportNode());

return sensitivityAnalysisRunner.runAsync(runContext.getNetwork(), variantId, sensitivityFactorReader, sensitivityResultPersistedWriter, runParameters)
.thenApply(unused -> Boolean.TRUE)
.whenComplete((result, throwable) -> syncWriterCompletion(throwable, sensitivityResultPersistedWriter))
.exceptionally(throwable -> handleAsyncError(throwable, runContext));
}

private void saveSensitivityResults(List<List<SensitivityFactor>> groupedFactors, UUID resultUuid, List<Contingency> contingencies) {
Expand Down Expand Up @@ -236,15 +208,15 @@ public SensitivityAnalysisResult run(UUID networkUuid, String variantId, ReportI
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
LOGGER.error(getFailedMessage(getComputationType()), e);
log.error(getFailedMessage(getComputationType()), e);
return null;
}
}

private SensitivityAnalysisResult runInMemory(SensitivityAnalysisRunContext runContext) throws Exception {
Objects.requireNonNull(runContext);

LOGGER.info("Run sensitivity analysis");
log.info("Run sensitivity analysis");

SensitivityAnalysis.Runner sensitivityAnalysisRunner = sensitivityAnalysisFactorySupplier.apply(runContext.getProvider());

Expand Down Expand Up @@ -313,4 +285,35 @@ private CompletableFuture<SensitivityAnalysisResult> runAsyncInMemory(Sensitivit
reporter);
return future.thenApply(r -> new SensitivityAnalysisResult(factors, writer.getContingencyStatuses(), writer.getValues()));
}

private void syncWriterCompletion(Throwable throwable, SensitivityResultPersistedWriter persistedWriter) {
try {
persistedWriter.notifyCompletion();

// success of the computation -> we wait for the writer to properly finish its job
// failure of the computation -> we don't wait for the writer : we don't need to persist results
if (throwable == null) {
persistedWriter.waitForCompletion();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
// writer can throw an exception if one or more results could not be persisted
throw new RuntimeException("Unexpected error during writer completion", e);
} finally {
persistedWriter.close();
}
}

private Boolean handleAsyncError(Throwable throwable, SensitivityAnalysisRunContext runContext) {
log.error("Error occurred during computation", throwable);
runContext.getReportNode()
.newReportNode()
.withMessageTemplate("sensitivity.analysis.server.sensitivityComputationFailed")
.withUntypedValue("exception", throwable.getMessage())
.withSeverity(TypedValue.ERROR_SEVERITY)
.add();
// false since the computation failed
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Copyright (c) 2026, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package org.gridsuite.sensitivityanalysis.server.util;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

/**
* @author Ghiles Abdellah {@literal <ghiles.abdellah at rte-france.com>}
*/
@Slf4j
public class BatchAsyncPoller<T> {

protected static final int BUFFER_SIZE = 512;
private static final int TASK_INITIAL_DELAY = 0;
private static final int TASK_DELAY = 100;

private final UUID resultUuid;
private final String taskName;
private final AtomicBoolean isProducerFinished;
private final BiConsumer<UUID, List<T>> batchHandlingFunction;

private final BlockingQueue<T> blockingQueue;
private final ScheduledFuture<?> pollingFuture;

public BatchAsyncPoller(ScheduledExecutorService scheduledExecutorService, UUID resultUuid,
String taskName, BiConsumer<UUID, List<T>> batchHandlingFunction) {
this.resultUuid = resultUuid;
this.taskName = taskName;
this.batchHandlingFunction = batchHandlingFunction;
this.isProducerFinished = new AtomicBoolean(false);

this.blockingQueue = new LinkedBlockingQueue<>();
this.pollingFuture = scheduledExecutorService.scheduleWithFixedDelay(this::drainQueue, TASK_INITIAL_DELAY, TASK_DELAY, TimeUnit.MILLISECONDS);
}

public void add(T data) {
// we check for :
// - pollingFuture.isDone() -> avoid storing data that will never be processed
// - isProducerFinished.get() -> since the producer is finished, the rest of the code can stop the data consumption at any given time
if (pollingFuture.isDone() || isProducerFinished.get()) {
throw new IllegalStateException("Cannot add data to a finished Poller");
}

blockingQueue.add(data);
}

public void notifyCompletion() {
isProducerFinished.set(true);
}

/**
* @throws InterruptedException - if the current thread was interrupted while waiting
* @throws ExecutionException - if one scheduled iteration failed
* @throws CancellationException - if the scheduled task was canceled abruptly
*/
public void waitForCompletion() throws InterruptedException, ExecutionException, CancellationException {
try {
pollingFuture.get();
} catch (CancellationException e) {
// Since CancellationException can be triggered either:
// - by the scheduler when the thread is interrupted, or
// - by the composition producer+consumer is finished,
// we need to check if the producer has finished
if (!hasFullyConsumedData()) {
throw e;
}
}
}

/**
* This method makes exceptions bubble if the `batchHandlingFunction` throws one.
* The goal is to stop the unnecessary consumption and allow the calling code to know that the process failed at one point.
* The scheduler will stop it and mark the future with an exception -> a call to `waitForCompletion` will then throw an `ExecutionException`
*/
private void drainQueue() {
List<T> buffer = new ArrayList<>(BUFFER_SIZE);

while (!shouldStop() && hasDrainedData(buffer)) {
log.debug("{} - Treating {} elements in the batch, {} elements remaining in the queue", taskName, buffer.size(), blockingQueue.size());
batchHandlingFunction.accept(resultUuid, new ArrayList<>(buffer));
buffer.clear();
}

if (shouldStop()) {
pollingFuture.cancel(false);
}
}

private boolean shouldStop() {
// Thread.currentThread().isInterrupted() check is mandatory for the loop since it doesn't have method calls that checks the flag
// hasFullyConsumedData() is also mandatory given the logic inside the calling method
// it allows to consume all data before leaving the calling loop (full drain)
return Thread.currentThread().isInterrupted() || hasFullyConsumedData();
}

private boolean hasFullyConsumedData() {
return isProducerFinished.get() && blockingQueue.isEmpty();
}

private boolean hasDrainedData(List<T> buffer) {
return blockingQueue.drainTo(buffer, BUFFER_SIZE) > 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (c) 2026, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package org.gridsuite.sensitivityanalysis.server.util;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;

/**
* @author Ghiles Abdellah {@literal <ghiles.abdellah at rte-france.com>}
*/
public final class BatchAsyncPollerFactory {

public static BatchAsyncPollerFactory getDefault() {
return new BatchAsyncPollerFactory();
}

public <T> BatchAsyncPoller<T> create(ScheduledExecutorService scheduledExecutorService, UUID resultUuid,
String taskName, BiConsumer<UUID, List<T>> batchHandlingFunction) {
return new BatchAsyncPoller<>(scheduledExecutorService, resultUuid, taskName, batchHandlingFunction);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (c) 2026, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package org.gridsuite.sensitivityanalysis.server.util;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

/**
* @author Ghiles Abdellah {@literal <ghiles.abdellah at rte-france.com>}
*/
public final class ScheduledThreadPoolFactory {

public static ScheduledThreadPoolFactory getDefault() {
return new ScheduledThreadPoolFactory();
}

public ScheduledExecutorService create(int size, UUID threadPrefix) {
Objects.requireNonNull(threadPrefix);

ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat(threadPrefix + "-%d")
.setDaemon(false)
.build();
return Executors.newScheduledThreadPool(size, factory);
}
}
Loading