Skip to content

Commit 7e5c2c1

Browse files
committed
* Add delay if outstanding confirms is more than 50% of max.
1 parent 197a22b commit 7e5c2c1

File tree

3 files changed

+38
-19
lines changed

3 files changed

+38
-19
lines changed

projects/RabbitMQ.Client/CreateChannelOptions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ public sealed class CreateChannelOptions
1616
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
1717

1818
/// <summary>
19-
/// The number of allowed outstanding publisher confirmations before publishing is blocked.
19+
/// If publisher confirmation tracking is enabled, this represents the number of allowed
20+
/// outstanding publisher confirmations before publishing is blocked.
2021
///
2122
/// Defaults to <c>128</c>
2223
///

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
125125
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
126126
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
127127

128-
if (_maxOutstandingPublisherConfirmations is not null)
128+
if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null)
129129
{
130130
_maxOutstandingConfirmationsSemaphore = new SemaphoreSlim(
131131
(int)_maxOutstandingPublisherConfirmations,
@@ -282,6 +282,18 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
282282
{
283283
if (_publisherConfirmationsEnabled)
284284
{
285+
if (_publisherConfirmationTrackingEnabled)
286+
{
287+
if (_maxOutstandingPublisherConfirmations is not null)
288+
{
289+
int percentOfMax = _confirmsTaskCompletionSources.Count / (int)_maxOutstandingPublisherConfirmations;
290+
if (percentOfMax > 0.5)
291+
{
292+
await Task.Delay(1000 * percentOfMax).ConfigureAwait(false);
293+
}
294+
}
295+
}
296+
285297
await _confirmSemaphore.WaitAsync(cancellationToken)
286298
.ConfigureAwait(false);
287299

@@ -292,12 +304,12 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
292304
{
293305
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
294306
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
295-
}
296307

297-
if (_maxOutstandingConfirmationsSemaphore is not null)
298-
{
299-
await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken)
300-
.ConfigureAwait(false);
308+
if (_maxOutstandingConfirmationsSemaphore is not null)
309+
{
310+
await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken)
311+
.ConfigureAwait(false);
312+
}
301313
}
302314

303315
_nextPublishSeqNo++;
@@ -346,7 +358,8 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
346358
.ConfigureAwait(false);
347359
}
348360

349-
if (_maxOutstandingConfirmationsSemaphore is not null)
361+
if (_publisherConfirmationTrackingEnabled &&
362+
_maxOutstandingConfirmationsSemaphore is not null)
350363
{
351364
_maxOutstandingConfirmationsSemaphore.Release();
352365
}

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,18 @@
3838
using System.Threading.Tasks;
3939
using RabbitMQ.Client;
4040

41+
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
42+
4143
const int MESSAGE_COUNT = 50_000;
4244
bool debug = false;
4345

46+
var channelOpts = new CreateChannelOptions
47+
{
48+
PublisherConfirmationsEnabled = true,
49+
PublisherConfirmationTrackingEnabled = true,
50+
MaxOutstandingPublisherConfirmations = MAX_OUTSTANDING_CONFIRMS
51+
};
52+
4453
#pragma warning disable CS8321 // Local function is declared but never used
4554

4655
await PublishMessagesIndividuallyAsync();
@@ -53,12 +62,12 @@ static Task<IConnection> CreateConnectionAsync()
5362
return factory.CreateConnectionAsync();
5463
}
5564

56-
static async Task PublishMessagesIndividuallyAsync()
65+
async Task PublishMessagesIndividuallyAsync()
5766
{
5867
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");
5968

6069
await using IConnection connection = await CreateConnectionAsync();
61-
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
70+
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
6271

6372
// declare a server-named queue
6473
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
@@ -85,18 +94,18 @@ static async Task PublishMessagesIndividuallyAsync()
8594
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
8695
}
8796

88-
static async Task PublishMessagesInBatchAsync()
97+
async Task PublishMessagesInBatchAsync()
8998
{
9099
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");
91100

92101
await using IConnection connection = await CreateConnectionAsync();
93-
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
102+
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
94103

95104
// declare a server-named queue
96105
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
97106
string queueName = queueDeclareResult.QueueName;
98107

99-
int batchSize = 1000;
108+
int batchSize = MAX_OUTSTANDING_CONFIRMS;
100109
int outstandingMessageCount = 0;
101110

102111
var sw = new Stopwatch();
@@ -154,12 +163,8 @@ async Task HandlePublishConfirmsAsynchronously()
154163

155164
await using IConnection connection = await CreateConnectionAsync();
156165

157-
var channelOptions = new CreateChannelOptions
158-
{
159-
PublisherConfirmationsEnabled = true,
160-
PublisherConfirmationTrackingEnabled = false
161-
};
162-
await using IChannel channel = await connection.CreateChannelAsync(channelOptions);
166+
channelOpts.PublisherConfirmationTrackingEnabled = false;
167+
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
163168

164169
// declare a server-named queue
165170
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();

0 commit comments

Comments
 (0)