Skip to content

Commit 0a11871

Browse files
committed
Support consumer priority
References rabbitmq/rabbitmq-server#11705
1 parent 63a09dd commit 0a11871

File tree

4 files changed

+90
-8
lines changed

4 files changed

+90
-8
lines changed

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public interface ConsumerBuilder {
2727

2828
ConsumerBuilder initialCredits(int initialCredits);
2929

30+
ConsumerBuilder priority(int priority);
31+
3032
ConsumerBuilder listeners(Resource.StateListener... listeners);
3133

3234
StreamOptions stream();

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.rabbitmq.client.amqp.AmqpException;
2323
import com.rabbitmq.client.amqp.Consumer;
2424
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
25-
import java.util.Collections;
2625
import java.util.Map;
2726
import java.util.concurrent.*;
2827
import java.util.concurrent.atomic.AtomicBoolean;
@@ -55,6 +54,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5554
private final String address;
5655
private final String queue;
5756
private final Map<String, Object> filters;
57+
private final Map<String, Object> linkProperties;
5858
private final AmqpConnection connection;
5959
private final AtomicReference<PauseStatus> pauseStatus =
6060
new AtomicReference<>(PauseStatus.UNPAUSED);
@@ -83,11 +83,13 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
8383
addressBuilder.queue(builder.queue());
8484
this.address = addressBuilder.address();
8585
this.queue = builder.queue();
86-
this.filters = Collections.unmodifiableMap(builder.filters());
86+
this.filters = Map.copyOf(builder.filters());
87+
this.linkProperties = Map.copyOf(builder.properties());
8788
this.connection = builder.connection();
8889
this.sessionHandler = this.connection.createSessionHandler();
8990
this.nativeReceiver =
90-
this.createNativeReceiver(this.sessionHandler.session(), this.address, this.filters);
91+
this.createNativeReceiver(
92+
this.sessionHandler.session(), this.address, this.linkProperties, this.filters);
9193
this.initStateFromNativeReceiver(this.nativeReceiver);
9294
this.metricsCollector = this.connection.metricsCollector();
9395
this.startReceivingLoop();
@@ -144,14 +146,18 @@ public void close() {
144146
// internal API
145147

146148
private ClientReceiver createNativeReceiver(
147-
Session nativeSession, String address, Map<String, Object> filters) {
149+
Session nativeSession,
150+
String address,
151+
Map<String, Object> properties,
152+
Map<String, Object> filters) {
148153
try {
149154
ReceiverOptions receiverOptions =
150155
new ReceiverOptions()
151156
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
152157
.autoAccept(false)
153158
.autoSettle(false)
154-
.creditWindow(0);
159+
.creditWindow(0)
160+
.properties(properties);
155161
if (!filters.isEmpty()) {
156162
receiverOptions.sourceOptions().filters(filters);
157163
}
@@ -205,7 +211,8 @@ private void startReceivingLoop() {
205211

206212
void recoverAfterConnectionFailure() {
207213
this.nativeReceiver =
208-
createNativeReceiver(this.sessionHandler.sessionNoCheck(), this.address, this.filters);
214+
createNativeReceiver(
215+
this.sessionHandler.sessionNoCheck(), this.address, this.linkProperties, this.filters);
209216
this.initStateFromNativeReceiver(this.nativeReceiver);
210217
this.pauseStatus.set(PauseStatus.UNPAUSED);
211218
this.unsettledMessageCount.set(0);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
3232
private Consumer.MessageHandler messageHandler;
3333
private int initialCredits = 100;
3434
private final List<Resource.StateListener> listeners = new ArrayList<>();
35-
private final Map<String, Object> filters = new HashMap<>();
35+
private final Map<String, Object> filters = new LinkedHashMap<>();
36+
private final Map<String, Object> properties = new LinkedHashMap<>();
3637
private final StreamOptions streamOptions = new DefaultStreamOptions(this, this.filters);
3738

3839
AmqpConsumerBuilder(AmqpConnection connection) {
@@ -57,6 +58,17 @@ public ConsumerBuilder initialCredits(int initialCredits) {
5758
return this;
5859
}
5960

61+
@Override
62+
public ConsumerBuilder priority(int priority) {
63+
if (priority < 0 || priority > 255) {
64+
throw new IllegalArgumentException(
65+
"The consumer priority must be between 0 and 255. "
66+
+ "Recommended values are between 0 and 5.");
67+
}
68+
this.properties.put("rabbitmq:priority", priority);
69+
return this;
70+
}
71+
6072
@Override
6173
public ConsumerBuilder listeners(Resource.StateListener... listeners) {
6274
if (listeners == null || listeners.length == 0) {
@@ -85,7 +97,11 @@ Consumer.MessageHandler messageHandler() {
8597
}
8698

8799
int initialCredits() {
88-
return initialCredits;
100+
return this.initialCredits;
101+
}
102+
103+
Map<String, Object> properties() {
104+
return this.properties;
89105
}
90106

91107
List<Resource.StateListener> listeners() {

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.atomic.AtomicInteger;
4141
import java.util.concurrent.atomic.AtomicReference;
4242
import java.util.function.Consumer;
43+
import java.util.stream.IntStream;
4344
import org.assertj.core.api.Assertions;
4445
import org.junit.jupiter.api.*;
4546
import org.junit.jupiter.api.extension.ExtendWith;
@@ -563,6 +564,62 @@ void consumerUnsettledMessagesGoBackToQueueAfterClosing() {
563564
.hasMessageCount(messageCount - settledCount);
564565
}
565566

567+
@Test
568+
void consumerWithHigherPriorityShouldGetMessagesFirst() {
569+
int messageCount = 100;
570+
connection.management().queue(name).exclusive(true).declare();
571+
AtomicInteger lowCount = new AtomicInteger(0);
572+
AtomicInteger highCount = new AtomicInteger(0);
573+
Sync consumeSync = sync(messageCount);
574+
com.rabbitmq.client.amqp.Consumer lowPriorityConsumer =
575+
connection
576+
.consumerBuilder()
577+
.queue(name)
578+
.priority(1)
579+
.messageHandler(
580+
(ctx, msg) -> {
581+
ctx.accept();
582+
lowCount.incrementAndGet();
583+
consumeSync.down();
584+
})
585+
.build();
586+
587+
com.rabbitmq.client.amqp.Consumer highPriorityConsumer =
588+
connection
589+
.consumerBuilder()
590+
.queue(name)
591+
.priority(5)
592+
.messageHandler(
593+
(ctx, msg) -> {
594+
ctx.accept();
595+
highCount.incrementAndGet();
596+
consumeSync.down();
597+
})
598+
.build();
599+
600+
Publisher publisher = connection.publisherBuilder().queue(name).build();
601+
Runnable publish =
602+
() ->
603+
IntStream.range(0, messageCount)
604+
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> {}));
605+
606+
publish.run();
607+
608+
assertThat(consumeSync).completes();
609+
assertThat(lowCount).hasValue(0);
610+
assertThat(highCount).hasValue(messageCount);
611+
612+
highPriorityConsumer.close();
613+
614+
consumeSync.reset(messageCount);
615+
publish.run();
616+
assertThat(consumeSync).completes();
617+
assertThat(lowCount).hasValue(messageCount);
618+
assertThat(highCount).hasValue(messageCount);
619+
620+
lowPriorityConsumer.close();
621+
}
622+
566623
private static String uuid() {
567624
return UUID.randomUUID().toString();
568625
}

0 commit comments

Comments
 (0)