Skip to content

Commit e84b658

Browse files
authored
Merge pull request #1687 from rabbitmq/rabbitmq-dotnet-client-1682
Track publisher confirmations automatically
2 parents c82e567 + e93adfa commit e84b658

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+892
-858
lines changed

projects/RabbitMQ.Client/Constants.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,15 @@ public static class Constants
8787
/// <summary>
8888
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
8989
/// to set this value for every channel created on a connection,
90-
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
90+
/// and <see cref="IConnection.CreateChannelAsync(CreateChannelOptions?, System.Threading.CancellationToken)" />
9191
/// for setting this value for a particular channel.
9292
/// </summary>
9393
public const ushort DefaultConsumerDispatchConcurrency = 1;
94+
95+
/// <summary>
96+
/// The message header used to track publish sequence numbers, to allow correlation when
97+
/// <c>basic.return</c> is sent via the broker.
98+
/// </summary>
99+
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";
94100
}
95101
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
namespace RabbitMQ.Client
2+
{
3+
/// <summary>
4+
/// Channel creation options.
5+
/// </summary>
6+
public sealed class CreateChannelOptions
7+
{
8+
/// <summary>
9+
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
10+
/// </summary>
11+
public bool PublisherConfirmationsEnabled { get; set; } = false;
12+
13+
/// <summary>
14+
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
15+
/// </summary>
16+
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
17+
18+
/// <summary>
19+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
20+
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
21+
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
22+
///
23+
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
24+
///
25+
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
26+
/// In addition to that consumers need to be thread/concurrency safe.
27+
/// </summary>
28+
public ushort? ConsumerDispatchConcurrency { get; set; } = null;
29+
30+
/// <summary>
31+
/// The default channel options.
32+
/// </summary>
33+
public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
34+
}
35+
}

projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
7272
/// <summary>
7373
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
7474
/// </summary>
75-
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default)
75+
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
76+
Exception exception, CancellationToken cancellationToken = default)
7677
: this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken)
7778
{
7879
_exception = exception ?? throw new ArgumentNullException(nameof(exception));

projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,35 +45,30 @@ namespace RabbitMQ.Client.Exceptions
4545
public class OperationInterruptedException
4646
: RabbitMQClientException
4747
{
48-
///<summary>Construct an OperationInterruptedException with
49-
///the passed-in explanation, if any.</summary>
50-
public OperationInterruptedException(ShutdownEventArgs? reason)
51-
: base(reason is null ? "The AMQP operation was interrupted" :
52-
$"The AMQP operation was interrupted: {reason}")
48+
///<summary>
49+
///Construct an OperationInterruptedException
50+
///</summary>
51+
public OperationInterruptedException() : base("The AMQP operation was interrupted")
5352
{
54-
ShutdownReason = reason;
55-
}
5653

57-
///<summary>Construct an OperationInterruptedException with
58-
///the passed-in explanation and prefix, if any.</summary>
59-
public OperationInterruptedException(ShutdownEventArgs? reason, string prefix)
60-
: base(reason is null ? $"{prefix}: The AMQP operation was interrupted" :
61-
$"{prefix}: The AMQP operation was interrupted: {reason}")
62-
{
63-
ShutdownReason = reason;
6454
}
65-
66-
protected OperationInterruptedException()
55+
///<summary>
56+
///Construct an OperationInterruptedException with
57+
///the passed-in explanation, if any.
58+
///</summary>
59+
public OperationInterruptedException(ShutdownEventArgs reason)
60+
: base($"The AMQP operation was interrupted: {reason}", reason.Exception)
6761
{
62+
ShutdownReason = reason;
6863
}
6964

70-
protected OperationInterruptedException(string message) : base(message)
71-
{
72-
}
7365

74-
protected OperationInterruptedException(string message, Exception inner)
75-
: base(message, inner)
66+
///<summary>Construct an OperationInterruptedException with
67+
///the passed-in explanation and prefix, if any.</summary>
68+
public OperationInterruptedException(ShutdownEventArgs reason, string prefix)
69+
: base($"{prefix}: The AMQP operation was interrupted: {reason}", reason.Exception)
7670
{
71+
ShutdownReason = reason;
7772
}
7873

7974
///<summary>Retrieves the explanation for the shutdown. May

projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ namespace RabbitMQ.Client.Exceptions
3636
[Serializable]
3737
public class ProtocolViolationException : RabbitMQClientException
3838
{
39-
public ProtocolViolationException(string message) : base(message)
39+
public ProtocolViolationException() : base()
4040
{
4141
}
42-
public ProtocolViolationException(string message, Exception inner) : base(message, inner)
42+
43+
public ProtocolViolationException(string message) : base(message)
4344
{
4445
}
45-
public ProtocolViolationException()
46+
47+
public ProtocolViolationException(string message, Exception inner) : base(message, inner)
4648
{
4749
}
4850
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
34+
namespace RabbitMQ.Client.Exceptions
35+
{
36+
/// <summary>
37+
/// Class for exceptions related to publisher confirmations
38+
/// or the <c>mandatory</c> flag.
39+
/// </summary>
40+
public class PublishException : RabbitMQClientException
41+
{
42+
private bool _isReturn = false;
43+
private ulong _publishSequenceNumber = ulong.MinValue;
44+
45+
public PublishException(ulong publishSequenceNumber, bool isReturn) : base()
46+
{
47+
if (publishSequenceNumber == ulong.MinValue)
48+
{
49+
throw new ArgumentException($"{nameof(publishSequenceNumber)} must not be 0");
50+
}
51+
52+
_isReturn = isReturn;
53+
_publishSequenceNumber = publishSequenceNumber;
54+
}
55+
56+
/// <summary>
57+
/// <c>true</c> if this exception is due to a <c>basic.return</c>
58+
/// </summary>
59+
public bool IsReturn => _isReturn;
60+
61+
/// <summary>
62+
/// Retrieve the publish sequence number.
63+
/// </summary>
64+
public ulong PublishSequenceNumber => _publishSequenceNumber;
65+
}
66+
}

projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,21 @@ namespace RabbitMQ.Client.Exceptions
3737
public abstract class RabbitMQClientException : Exception
3838
{
3939
/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class.</summary>
40-
protected RabbitMQClientException()
40+
protected RabbitMQClientException() : base()
4141
{
42-
4342
}
4443

4544
/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class with a specified error message.</summary>
4645
/// <param name="message">The message that describes the error. </param>
4746
protected RabbitMQClientException(string message) : base(message)
4847
{
49-
5048
}
5149

5250
/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class with a specified error message and a reference to the inner exception that is the cause of this exception.</summary>
5351
/// <param name="message">The error message that explains the reason for the exception. </param>
5452
/// <param name="innerException">The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. </param>
55-
protected RabbitMQClientException(string message, Exception innerException) : base(message, innerException)
53+
protected RabbitMQClientException(string message, Exception? innerException) : base(message, innerException)
5654
{
57-
5855
}
5956
}
6057
}

projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
namespace RabbitMQ.Client.Exceptions
3636
{
3737
/// <summary>
38+
/// TODO WHY IS THIS UNREFERENCED?
3839
/// Thrown when the channel receives an RPC reply that it wasn't expecting.
3940
/// </summary>
4041
[Serializable]

projects/RabbitMQ.Client/Framing/Channel.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
namespace RabbitMQ.Client.Framing
3737
{
38+
// TODO merge into ChannelBase
3839
internal class Channel : ChannelBase
3940
{
4041
public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ Task<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
204204
/// <param name="cancellationToken">CancellationToken for this operation.</param>
205205
/// <remarks>
206206
/// Routing key must be shorter than 255 bytes.
207+
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
207208
/// </remarks>
208209
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
209210
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
@@ -221,6 +222,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
221222
/// <param name="cancellationToken">CancellationToken for this operation.</param>
222223
/// <remarks>
223224
/// Routing key must be shorter than 255 bytes.
225+
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
224226
/// </remarks>
225227
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
226228
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
@@ -265,14 +267,6 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
265267
Task CloseAsync(ShutdownEventArgs reason, bool abort,
266268
CancellationToken cancellationToken = default);
267269

268-
/// <summary>
269-
/// Asynchronously enable publisher confirmations.
270-
/// </summary>
271-
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcksAsync"/> and <see cref="BasicNacksAsync"/> yourself.</param>
272-
/// <param name="cancellationToken">CancellationToken for this operation.</param>
273-
Task ConfirmSelectAsync(bool trackConfirmations = true,
274-
CancellationToken cancellationToken = default);
275-
276270
/// <summary>Asynchronously declare an exchange.</summary>
277271
/// <param name="exchange">The name of the exchange.</param>
278272
/// <param name="type">The type of the exchange.</param>
@@ -451,32 +445,6 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey,
451445
/// <param name="cancellationToken">The cancellation token.</param>
452446
Task TxSelectAsync(CancellationToken cancellationToken = default);
453447

454-
/// <summary>
455-
/// Asynchronously wait until all published messages on this channel have been confirmed.
456-
/// </summary>
457-
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
458-
/// <param name="cancellationToken">The cancellation token.</param>
459-
/// <remarks>
460-
/// Waits until all messages published on this channel since the last call have
461-
/// been either ack'd or nack'd by the server. Returns whether
462-
/// all the messages were ack'd (and none were nack'd).
463-
/// Throws an exception when called on a channel
464-
/// that does not have publisher confirms enabled.
465-
/// </remarks>
466-
Task<bool> WaitForConfirmsAsync(CancellationToken cancellationToken = default);
467-
468-
/// <summary>
469-
/// Wait until all published messages on this channel have been confirmed.
470-
/// </summary>
471-
/// <param name="cancellationToken">The cancellation token.</param>
472-
/// <remarks>
473-
/// Waits until all messages published on this channel since the last call have
474-
/// been ack'd by the server. If a nack is received or the timeout
475-
/// elapses, throws an IOException exception immediately and closes
476-
/// the channel.
477-
/// </remarks>
478-
Task WaitForConfirmsOrDieAsync(CancellationToken cancellationToken = default);
479-
480448
/// <summary>
481449
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
482450
/// timing out.

projects/RabbitMQ.Client/IConnection.cs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -240,18 +240,11 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
240240
/// <summary>
241241
/// Asynchronously create and return a fresh channel, session, and channel.
242242
/// </summary>
243-
/// <param name="consumerDispatchConcurrency">
244-
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
245-
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
246-
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
247-
///
248-
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
249-
///
250-
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
251-
/// In addition to that consumers need to be thread/concurrency safe.
243+
/// <param name="options">
244+
/// The channel creation options.
252245
/// </param>
253246
/// <param name="cancellationToken">Cancellation token</param>
254-
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
247+
Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
255248
CancellationToken cancellationToken = default);
256249
}
257250
}

0 commit comments

Comments
 (0)