Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- Runtime dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.gridsuite.loadflow.server.config;

import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
public class RabbitConsumerConfig {
/*
* RabbitMQ consumer priority:
* https://www.rabbitmq.com/docs/consumer-priority
*
* Each container creates exactly one AMQP consumer with prefetch=1 and its own priority.
* When dispatching messages, RabbitMQ always selects the highest-priority consumer
* that is available.
*/
@Bean
public ListenerContainerCustomizer<MessageListenerContainer> customizer() {
/*
* Using AtomicInteger as in org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java
* We expect cloud stream to call our customizer exactly once in order for each container so it will produce a sequence of increasing priorities
*/
AtomicInteger index = new AtomicInteger();
return (container, destination, group) -> {
if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(group, "loadflowGroup")) {
smlc.setConsumerArguments(Map.of("x-priority", index.getAndIncrement()));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,33 @@ public static String getNextLimitName(LimitViolationInfos limitViolationInfos, N
return temporaryLimit != null ? temporaryLimit.getName() : null;
}

/*
* Spring Cloud Stream does not allow customizing each consumer within a single listener
* container (i.e. when concurrency = N)
*
* Since we need to customize each consumer individually, we simulate "concurrency = N"
* by creating N listener containers, each with concurrency = 1.
*
* This requires defining one Consumer bean per container, which explains
* the duplicated methods below.
*/
@Bean
@Override
public Consumer<Message<String>> consumeRun() {
public Consumer<Message<String>> consumeRun1() {
return super.consumeRun();
}

@Bean
public Consumer<Message<String>> consumeRun2() {
return super.consumeRun();
}

@Bean
public Consumer<Message<String>> consumeRun3() {
return super.consumeRun();
}

@Bean
public Consumer<Message<String>> consumeRun4() {
return super.consumeRun();
}

Expand Down
25 changes: 21 additions & 4 deletions src/main/resources/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@ spring:

cloud:
function:
definition: consumeRun;consumeCancel
definition: consumeRun1;consumeRun2;consumeRun3;consumeRun4;consumeCancel
stream:
bindings:
consumeRun-in-0:
# Spring Cloud Stream does not allow customizing each consumer within a single listener
# container (i.e. when concurrency = N)
#
# Since we need to customize each consumer individually, we simulate "concurrency = N"
# by creating N listener containers, each with concurrency = 1.
consumeRun1-in-0: &consumeRunConfig
destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run
group: loadflowGroup
consumer:
concurrency: 4
max-attempts: 1
consumeRun2-in-0:
<<: *consumeRunConfig
consumeRun3-in-0:
<<: *consumeRunConfig
consumeRun4-in-0:
<<: *consumeRunConfig
publishRun-out-0:
destination: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run
publishResult-out-0:
Expand All @@ -28,7 +38,8 @@ spring:
output-bindings: publishRun-out-0;publishResult-out-0;publishCancel-out-0;publishStopped-out-0;publishCancelFailed-out-0
rabbit:
bindings:
consumeRun-in-0:
# See comment on spring.cloud.stream.bindings.consumeRun1-in-0
consumeRun1-in-0: &consumeRunRabbitConfig
consumer:
auto-bind-dlq: true
dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}loadflow.run.dlx
Expand All @@ -37,6 +48,12 @@ spring:
quorum:
enabled: true
delivery-limit: 2
consumeRun2-in-0:
<<: *consumeRunRabbitConfig
consumeRun3-in-0:
<<: *consumeRunRabbitConfig
consumeRun4-in-0:
<<: *consumeRunRabbitConfig
powsybl-ws:
database:
name: loadflow
Expand Down
4 changes: 4 additions & 0 deletions src/test/resources/application-default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ spring:
hibernate:
#to turn off schema validation that fails (because of clob types) and blocks tests even if the the schema is compatible
ddl-auto: none
cloud:
function:
# disable consumeRun2/3/4 during test - all of them receive the "loadflowGroup" messages otherwise
definition: consumeRun1;consumeCancel

logging:
level:
Expand Down