Skip to content

Commit b6b400d

Browse files
authored
Merge pull request #1675 from danielmarbach/channel-base
Sequence Number non-blocking
2 parents 1896a62 + 5326343 commit b6b400d

File tree

9 files changed

+58
-58
lines changed

9 files changed

+58
-58
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,9 +440,9 @@ RabbitMQ.Client.IChannel.CurrentQueue.get -> string
440440
RabbitMQ.Client.IChannel.DefaultConsumer.get -> RabbitMQ.Client.IAsyncBasicConsumer
441441
RabbitMQ.Client.IChannel.DefaultConsumer.set -> void
442442
RabbitMQ.Client.IChannel.FlowControl -> System.EventHandler<RabbitMQ.Client.Events.FlowControlEventArgs>
443+
RabbitMQ.Client.IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<ulong>
443444
RabbitMQ.Client.IChannel.IsClosed.get -> bool
444445
RabbitMQ.Client.IChannel.IsOpen.get -> bool
445-
RabbitMQ.Client.IChannel.NextPublishSeqNo.get -> ulong
446446
RabbitMQ.Client.IChannelExtensions
447447
RabbitMQ.Client.IConnection
448448
RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,6 @@ public interface IChannel : IDisposable
9595
/// </summary>
9696
bool IsOpen { get; }
9797

98-
/// <summary>
99-
/// When in confirm mode, return the sequence number of the next message to be published.
100-
/// </summary>
101-
ulong NextPublishSeqNo { get; }
102-
10398
/// <summary>
10499
/// The name of the last queue declared on this channel.
105100
/// </summary>
@@ -143,6 +138,11 @@ public interface IChannel : IDisposable
143138
/// </remarks>
144139
event EventHandler<ShutdownEventArgs> ChannelShutdown;
145140

141+
/// <summary>
142+
/// When in confirm mode, return the sequence number of the next message to be published.
143+
/// </summary>
144+
ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default);
145+
146146
/// <summary>Asynchronously acknknowledges one or more messages.</summary>
147147
/// <param name="deliveryTag">The delivery tag.</param>
148148
/// <param name="multiple">Ack all messages up to the delivery tag if set to <c>true</c>.</param>

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,11 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
8989
}
9090
case ProtocolCommandId.BasicAck:
9191
{
92-
HandleBasicAck(cmd);
93-
return Task.FromResult(true);
92+
return HandleBasicAck(cmd, cancellationToken);
9493
}
9594
case ProtocolCommandId.BasicNack:
9695
{
97-
HandleBasicNack(cmd);
98-
return Task.FromResult(true);
96+
return HandleBasicNack(cmd, cancellationToken);
9997
}
10098
case ProtocolCommandId.BasicReturn:
10199
{

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,6 @@ public IAsyncBasicConsumer? DefaultConsumer
144144

145145
public bool IsOpen => !_disposed && _innerChannel.IsOpen;
146146

147-
public ulong NextPublishSeqNo => InnerChannel.NextPublishSeqNo;
148-
149147
public string? CurrentQueue => InnerChannel.CurrentQueue;
150148

151149
internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
@@ -274,6 +272,8 @@ public void Dispose()
274272
_disposed = true;
275273
}
276274

275+
public ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);
276+
277277
public ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken)
278278
=> InnerChannel.BasicAckAsync(deliveryTag, multiple, cancellationToken);
279279

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -181,29 +181,6 @@ public IAsyncBasicConsumer? DefaultConsumer
181181
[MemberNotNullWhen(false, nameof(CloseReason))]
182182
public bool IsOpen => CloseReason is null;
183183

184-
public ulong NextPublishSeqNo
185-
{
186-
get
187-
{
188-
if (ConfirmsAreEnabled)
189-
{
190-
_confirmSemaphore.Wait();
191-
try
192-
{
193-
return _nextPublishSeqNo;
194-
}
195-
finally
196-
{
197-
_confirmSemaphore.Release();
198-
}
199-
}
200-
else
201-
{
202-
return _nextPublishSeqNo;
203-
}
204-
}
205-
}
206-
207184
public string? CurrentQueue { get; private set; }
208185

209186
public ISession Session { get; private set; }
@@ -589,7 +566,7 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart
589566
return ModelSendAsync(method, cancellationToken).AsTask();
590567
}
591568

592-
protected void HandleBasicAck(IncomingCommand cmd)
569+
protected async Task<bool> HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken)
593570
{
594571
var ack = new BasicAck(cmd.MethodSpan);
595572
if (!_basicAcksWrapper.IsEmpty)
@@ -598,10 +575,12 @@ protected void HandleBasicAck(IncomingCommand cmd)
598575
_basicAcksWrapper.Invoke(this, args);
599576
}
600577

601-
HandleAckNack(ack._deliveryTag, ack._multiple, false);
578+
await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken)
579+
.ConfigureAwait(false);
580+
return true;
602581
}
603582

604-
protected void HandleBasicNack(IncomingCommand cmd)
583+
protected async Task<bool> HandleBasicNack(IncomingCommand cmd, CancellationToken cancellationToken)
605584
{
606585
var nack = new BasicNack(cmd.MethodSpan);
607586
if (!_basicNacksWrapper.IsEmpty)
@@ -611,7 +590,9 @@ protected void HandleBasicNack(IncomingCommand cmd)
611590
_basicNacksWrapper.Invoke(this, args);
612591
}
613592

614-
HandleAckNack(nack._deliveryTag, nack._multiple, true);
593+
await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken)
594+
.ConfigureAwait(false);
595+
return true;
615596
}
616597

617598
protected async Task<bool> HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken)
@@ -801,6 +782,26 @@ protected void HandleConnectionUnblocked()
801782
Session.Connection.HandleConnectionUnblocked();
802783
}
803784

785+
public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
786+
{
787+
if (ConfirmsAreEnabled)
788+
{
789+
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
790+
try
791+
{
792+
return _nextPublishSeqNo;
793+
}
794+
finally
795+
{
796+
_confirmSemaphore.Release();
797+
}
798+
}
799+
else
800+
{
801+
return _nextPublishSeqNo;
802+
}
803+
}
804+
804805
public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
805806
CancellationToken cancellationToken);
806807

@@ -1829,7 +1830,7 @@ await tokenRegistration.DisposeAsync()
18291830

18301831
// NOTE: this method is internal for its use in this test:
18311832
// TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse
1832-
internal void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
1833+
internal async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default)
18331834
{
18341835
// Only do this if confirms are enabled *and* the library is tracking confirmations
18351836
if (ConfirmsAreEnabled && _trackConfirmations)
@@ -1839,7 +1840,8 @@ internal void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
18391840
throw new InvalidOperationException(InternalConstants.BugFound);
18401841
}
18411842
// let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
1842-
_confirmSemaphore.Wait();
1843+
await _confirmSemaphore.WaitAsync(cancellationToken)
1844+
.ConfigureAwait(false);
18431845
try
18441846
{
18451847
// No need to do anything if there are no delivery tags in the list

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
173173
{
174174
string msg = i.ToString();
175175
byte[] body = Encoding.UTF8.GetBytes(msg);
176-
ulong nextPublishSeqNo = channel.NextPublishSeqNo;
176+
ulong nextPublishSeqNo = await channel.GetNextPublishSequenceNumberAsync();
177177
if ((ulong)(i + 1) != nextPublishSeqNo)
178178
{
179179
Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}");

projects/Test/Integration/TestConfirmSelect.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,19 @@ ValueTask PublishAsync()
5353
}
5454

5555
await _channel.ConfirmSelectAsync();
56-
Assert.Equal(1ul, _channel.NextPublishSeqNo);
56+
Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync());
5757
await PublishAsync();
58-
Assert.Equal(2ul, _channel.NextPublishSeqNo);
58+
Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync());
5959
await PublishAsync();
60-
Assert.Equal(3ul, _channel.NextPublishSeqNo);
60+
Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync());
6161

6262
await _channel.ConfirmSelectAsync();
6363
await PublishAsync();
64-
Assert.Equal(4ul, _channel.NextPublishSeqNo);
64+
Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync());
6565
await PublishAsync();
66-
Assert.Equal(5ul, _channel.NextPublishSeqNo);
66+
Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync());
6767
await PublishAsync();
68-
Assert.Equal(6ul, _channel.NextPublishSeqNo);
68+
Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync());
6969
}
7070

7171
[Theory]
@@ -80,7 +80,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength)
8080
await _channel.ConfirmSelectAsync();
8181

8282
var properties = new BasicProperties();
83-
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
83+
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
8484
await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
8585
mandatory: false, basicProperties: properties, body: body);
8686
await _channel.WaitForConfirmsOrDieAsync();
@@ -91,7 +91,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
9191
{
9292
CorrelationId = new string('o', correlationIdLength)
9393
};
94-
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
94+
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
9595
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
9696
await _channel.WaitForConfirmsOrDieAsync();
9797
}
@@ -101,7 +101,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
101101
}
102102

103103
properties = new BasicProperties();
104-
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
104+
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
105105
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
106106
await _channel.WaitForConfirmsOrDieAsync();
107107
// _output.WriteLine("I'm done...");

projects/Test/Integration/TestConfirmSelectAsync.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,19 @@ public TestConfirmSelectAsync(ITestOutputHelper output) : base(output)
4949
public async Task TestConfirmSelectIdempotency()
5050
{
5151
await _channel.ConfirmSelectAsync();
52-
Assert.Equal(1ul, _channel.NextPublishSeqNo);
52+
Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync());
5353
await Publish();
54-
Assert.Equal(2ul, _channel.NextPublishSeqNo);
54+
Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync());
5555
await Publish();
56-
Assert.Equal(3ul, _channel.NextPublishSeqNo);
56+
Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync());
5757

5858
await _channel.ConfirmSelectAsync();
5959
await Publish();
60-
Assert.Equal(4ul, _channel.NextPublishSeqNo);
60+
Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync());
6161
await Publish();
62-
Assert.Equal(5ul, _channel.NextPublishSeqNo);
62+
Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync());
6363
await Publish();
64-
Assert.Equal(6ul, _channel.NextPublishSeqNo);
64+
Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync());
6565
}
6666

6767
private ValueTask Publish()

projects/Test/Integration/TestPublisherConfirms.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout
105105
return TestWaitForConfirmsAsync(2000, async (ch) =>
106106
{
107107
RecoveryAwareChannel actualChannel = ((AutorecoveringChannel)ch).InnerChannel;
108-
actualChannel.HandleAckNack(10UL, false, true);
108+
await actualChannel.HandleAckNack(10UL, false, true);
109109
using (var cts = new CancellationTokenSource(ShortSpan))
110110
{
111111
Assert.False(await ch.WaitForConfirmsAsync(cts.Token));

0 commit comments

Comments
 (0)