Skip to content

Commit 7f512d7

Browse files
authored
Merge pull request #1703 from rabbitmq/rabbitmq-dotnet-client-1682-outstanding-confirms
Enforce maximum outstanding publisher confirms, if set
2 parents 44dbd0a + 1660109 commit 7f512d7

9 files changed

+119
-41
lines changed

projects/RabbitMQ.Client/CreateChannelOptions.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@ public sealed class CreateChannelOptions
1515
/// </summary>
1616
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
1717

18+
/// <summary>
19+
/// If publisher confirmation tracking is enabled, this represents the number of allowed
20+
/// outstanding publisher confirmations before publishing is blocked.
21+
///
22+
/// Defaults to <c>128</c>
23+
///
24+
/// Set to <c>null</c>, to allow an unlimited number of outstanding confirmations.
25+
///
26+
/// </summary>
27+
public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128;
28+
1829
/// <summary>
1930
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
2031
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
5151
private ushort _prefetchCountGlobal;
5252
private bool _publisherConfirmationsEnabled = false;
5353
private bool _publisherConfirmationTrackingEnabled = false;
54+
private ushort? _maxOutstandingPublisherConfirmations = null;
5455
private bool _usesTransactions;
5556
private ushort _consumerDispatchConcurrency;
5657

@@ -71,14 +72,20 @@ public TimeSpan ContinuationTimeout
7172
set => InnerChannel.ContinuationTimeout = value;
7273
}
7374

74-
public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
75-
ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
75+
// TODO just pass create channel options
76+
public AutorecoveringChannel(AutorecoveringConnection conn,
77+
RecoveryAwareChannel innerChannel,
78+
ushort consumerDispatchConcurrency,
79+
bool publisherConfirmationsEnabled,
80+
bool publisherConfirmationTrackingEnabled,
81+
ushort? maxOutstandingPublisherConfirmations)
7682
{
7783
_connection = conn;
7884
_innerChannel = innerChannel;
7985
_consumerDispatchConcurrency = consumerDispatchConcurrency;
8086
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
8187
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
88+
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
8289
}
8390

8491
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
@@ -164,8 +171,11 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
164171
_connection = conn;
165172

166173
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
167-
_publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled,
168-
_consumerDispatchConcurrency, cancellationToken)
174+
_publisherConfirmationsEnabled,
175+
_publisherConfirmationTrackingEnabled,
176+
_maxOutstandingPublisherConfirmations,
177+
_consumerDispatchConcurrency,
178+
cancellationToken)
169179
.ConfigureAwait(false);
170180
newChannel.TakeOver(_innerChannel);
171181

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,16 +184,21 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs
184184

185185
public IProtocol Protocol => Endpoint.Protocol;
186186

187+
// TODO pass channel creation options?
187188
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
188189
bool publisherConfirmationsEnabled = false,
189190
bool publisherConfirmationTrackingEnabled = false,
191+
ushort? maxOutstandingPublisherConfirmations = null,
190192
ushort? consumerDispatchConcurrency = null,
191193
CancellationToken cancellationToken = default)
192194
{
193195
ISession session = InnerConnection.CreateSession();
194196
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
195197
return (RecoveryAwareChannel)await result.OpenAsync(
196-
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken)
198+
publisherConfirmationsEnabled,
199+
publisherConfirmationTrackingEnabled,
200+
maxOutstandingPublisherConfirmations,
201+
cancellationToken)
197202
.ConfigureAwait(false);
198203
}
199204

@@ -266,11 +271,20 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
266271
ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
267272

268273
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
269-
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken)
274+
options.PublisherConfirmationsEnabled,
275+
options.PublisherConfirmationTrackingEnabled,
276+
options.MaxOutstandingPublisherConfirmations,
277+
cdc,
278+
cancellationToken)
270279
.ConfigureAwait(false);
271280

272-
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc,
273-
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled);
281+
// TODO just pass create channel options
282+
var autorecoveringChannel = new AutorecoveringChannel(this,
283+
recoveryAwareChannel,
284+
cdc,
285+
options.PublisherConfirmationsEnabled,
286+
options.PublisherConfirmationTrackingEnabled,
287+
options.MaxOutstandingPublisherConfirmations);
274288
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
275289
.ConfigureAwait(false);
276290
return autorecoveringChannel;

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
using System;
3333
using System.Collections.Generic;
3434
using System.Diagnostics;
35+
using System.Runtime.CompilerServices;
3536
using System.Threading;
3637
using System.Threading.Tasks;
3738
using RabbitMQ.Client.Framing;
@@ -41,6 +42,8 @@ namespace RabbitMQ.Client.Impl
4142
{
4243
internal partial class Channel : IChannel, IRecoverable
4344
{
45+
private readonly AsyncManualResetEvent _flowControlBlock = new(true);
46+
4447
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
4548
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
4649
CancellationToken cancellationToken = default)
@@ -53,7 +56,7 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
5356
await MaybeStartPublisherConfirmationTracking(cancellationToken)
5457
.ConfigureAwait(false);
5558

56-
await EnforceFlowControlAsync(cancellationToken)
59+
await MaybeEnforceFlowControlAsync(cancellationToken)
5760
.ConfigureAwait(false);
5861

5962
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
@@ -108,7 +111,7 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
108111
await MaybeStartPublisherConfirmationTracking(cancellationToken)
109112
.ConfigureAwait(false);
110113

111-
await EnforceFlowControlAsync(cancellationToken)
114+
await MaybeEnforceFlowControlAsync(cancellationToken)
112115
.ConfigureAwait(false);
113116

114117
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
@@ -220,5 +223,16 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
220223
}
221224
}
222225
}
226+
227+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
228+
private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
229+
{
230+
if (_flowControlBlock.IsSet)
231+
{
232+
return default;
233+
}
234+
235+
return _flowControlBlock.WaitAsync(cancellationToken);
236+
}
223237
}
224238
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ internal partial class Channel : IChannel, IRecoverable
4747
{
4848
private bool _publisherConfirmationsEnabled = false;
4949
private bool _publisherConfirmationTrackingEnabled = false;
50+
private ushort? _maxOutstandingPublisherConfirmations = null;
51+
private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore;
5052
private ulong _nextPublishSeqNo = 0;
5153
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
5254
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
@@ -115,10 +117,20 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
115117
}
116118
}
117119

118-
private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
120+
private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
121+
bool publisherConfirmationTrackingEnabled,
122+
ushort? maxOutstandingPublisherConfirmations)
119123
{
120124
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
121125
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
126+
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
127+
128+
if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null)
129+
{
130+
_maxOutstandingConfirmationsSemaphore = new SemaphoreSlim(
131+
(int)_maxOutstandingPublisherConfirmations,
132+
(int)_maxOutstandingPublisherConfirmations);
133+
}
122134
}
123135

124136
private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
@@ -270,6 +282,13 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
270282
{
271283
if (_publisherConfirmationsEnabled)
272284
{
285+
if (_publisherConfirmationTrackingEnabled &&
286+
_maxOutstandingConfirmationsSemaphore is not null)
287+
{
288+
await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken)
289+
.ConfigureAwait(false);
290+
}
291+
273292
await _confirmSemaphore.WaitAsync(cancellationToken)
274293
.ConfigureAwait(false);
275294

@@ -320,6 +339,12 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
320339
{
321340
if (_publisherConfirmationsEnabled)
322341
{
342+
if (_publisherConfirmationTrackingEnabled &&
343+
_maxOutstandingConfirmationsSemaphore is not null)
344+
{
345+
_maxOutstandingConfirmationsSemaphore.Release();
346+
}
347+
323348
_confirmSemaphore.Release();
324349

325350
if (publisherConfirmationInfo is not null)

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ internal partial class Channel : IChannel, IRecoverable
5555
// AMQP only allows one RPC operation to be active at a time.
5656
protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
5757
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
58-
private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true);
5958

6059
private ShutdownEventArgs? _closeReason;
6160
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
@@ -361,11 +360,14 @@ protected bool Enqueue(IRpcContinuation k)
361360
}
362361
}
363362

364-
internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled = false,
365-
bool publisherConfirmationTrackingEnabled = false,
366-
CancellationToken cancellationToken = default)
363+
internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled,
364+
bool publisherConfirmationTrackingEnabled,
365+
ushort? maxOutstandingPublisherConfirmations,
366+
CancellationToken cancellationToken)
367367
{
368-
ConfigurePublisherConfirmations(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled);
368+
ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
369+
publisherConfirmationTrackingEnabled,
370+
maxOutstandingPublisherConfirmations);
369371

370372
bool enqueued = false;
371373
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
@@ -450,17 +452,6 @@ protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THead
450452
return Session.TransmitAsync(in method, in header, body, cancellationToken);
451453
}
452454

453-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
454-
protected ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken)
455-
{
456-
if (_flowControlBlock.IsSet)
457-
{
458-
return default;
459-
}
460-
461-
return _flowControlBlock.WaitAsync(cancellationToken);
462-
}
463-
464455
internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args)
465456
{
466457
return _callbackExceptionAsyncWrapper.InvokeAsync(this, args);
@@ -540,7 +531,8 @@ protected virtual void Dispose(bool disposing)
540531

541532
ConsumerDispatcher.Dispose();
542533
_rpcSemaphore.Dispose();
543-
_confirmSemaphore?.Dispose();
534+
_confirmSemaphore.Dispose();
535+
_maxOutstandingConfirmationsSemaphore?.Dispose();
544536
}
545537
}
546538

@@ -561,7 +553,8 @@ protected virtual async ValueTask DisposeAsyncCore()
561553

562554
ConsumerDispatcher.Dispose();
563555
_rpcSemaphore.Dispose();
564-
_confirmSemaphore?.Dispose();
556+
_confirmSemaphore.Dispose();
557+
_maxOutstandingConfirmationsSemaphore?.Dispose();
565558
}
566559

567560
public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,11 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
273273

274274
// TODO channel CreateChannelAsync() to combine ctor and OpenAsync
275275
var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
276-
IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken)
276+
IChannel ch = await channel.OpenAsync(
277+
options.PublisherConfirmationsEnabled,
278+
options.PublisherConfirmationTrackingEnabled,
279+
options.MaxOutstandingPublisherConfirmations,
280+
cancellationToken)
277281
.ConfigureAwait(false);
278282
return ch;
279283
}

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ RabbitMQ.Client.CreateChannelOptions
33
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
44
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
55
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void
6+
RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.get -> ushort?
7+
RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.set -> void
68
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool
79
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void
810
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,18 @@
3838
using System.Threading.Tasks;
3939
using RabbitMQ.Client;
4040

41+
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
42+
4143
const int MESSAGE_COUNT = 50_000;
4244
bool debug = false;
4345

46+
var channelOpts = new CreateChannelOptions
47+
{
48+
PublisherConfirmationsEnabled = true,
49+
PublisherConfirmationTrackingEnabled = true,
50+
MaxOutstandingPublisherConfirmations = MAX_OUTSTANDING_CONFIRMS
51+
};
52+
4453
#pragma warning disable CS8321 // Local function is declared but never used
4554

4655
await PublishMessagesIndividuallyAsync();
@@ -53,12 +62,12 @@ static Task<IConnection> CreateConnectionAsync()
5362
return factory.CreateConnectionAsync();
5463
}
5564

56-
static async Task PublishMessagesIndividuallyAsync()
65+
async Task PublishMessagesIndividuallyAsync()
5766
{
5867
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");
5968

6069
await using IConnection connection = await CreateConnectionAsync();
61-
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
70+
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
6271

6372
// declare a server-named queue
6473
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
@@ -85,18 +94,18 @@ static async Task PublishMessagesIndividuallyAsync()
8594
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
8695
}
8796

88-
static async Task PublishMessagesInBatchAsync()
97+
async Task PublishMessagesInBatchAsync()
8998
{
9099
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");
91100

92101
await using IConnection connection = await CreateConnectionAsync();
93-
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
102+
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
94103

95104
// declare a server-named queue
96105
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
97106
string queueName = queueDeclareResult.QueueName;
98107

99-
int batchSize = 1000;
108+
int batchSize = MAX_OUTSTANDING_CONFIRMS;
100109
int outstandingMessageCount = 0;
101110

102111
var sw = new Stopwatch();
@@ -154,12 +163,8 @@ async Task HandlePublishConfirmsAsynchronously()
154163

155164
await using IConnection connection = await CreateConnectionAsync();
156165

157-
var channelOptions = new CreateChannelOptions
158-
{
159-
PublisherConfirmationsEnabled = true,
160-
PublisherConfirmationTrackingEnabled = false
161-
};
162-
await using IChannel channel = await connection.CreateChannelAsync(channelOptions);
166+
channelOpts.PublisherConfirmationTrackingEnabled = false;
167+
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
163168

164169
// declare a server-named queue
165170
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();

0 commit comments

Comments
 (0)