Skip to content

Commit b057ca2

Browse files
committed
Serialize sequence number as long or string.
Fixes issue raised in #1805 Encode publish sequence number in x-dotnet-pub-seq-no as a `long` instead of ASCII string, unless value is greater than `long.MaxValue`.
1 parent 85fd42f commit b057ca2

File tree

3 files changed

+72
-19
lines changed

3 files changed

+72
-19
lines changed

projects/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Buffers.Binary;
3433
using System.Collections.Generic;
3534
using System.Diagnostics;
3635
using System.Text;
3736
using System.Threading;
3837
using System.Threading.Tasks;
3938
using RabbitMQ.Client;
39+
using RabbitMQ.Client.Exceptions;
4040

4141
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
4242

@@ -124,11 +124,33 @@ async Task PublishMessagesInBatchAsync()
124124
var sw = new Stopwatch();
125125
sw.Start();
126126

127+
channel.BasicReturnAsync += (sender, ea) =>
128+
{
129+
ulong sequenceNumber = 0;
130+
131+
IReadOnlyBasicProperties props = ea.BasicProperties;
132+
if (props.Headers is not null)
133+
{
134+
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
135+
if (maybeSeqNum is long longSequenceNumber)
136+
{
137+
sequenceNumber = (ulong)longSequenceNumber;
138+
}
139+
}
140+
141+
return Console.Out.WriteLineAsync($"{DateTime.Now} [INFO] message sequence number '{sequenceNumber}' has been basic.return-ed");
142+
};
143+
127144
var publishTasks = new List<ValueTask>();
128145
for (int i = 0; i < MESSAGE_COUNT; i++)
129146
{
147+
string rk = queueName;
148+
if (i % 1000 == 0)
149+
{
150+
rk = Guid.NewGuid().ToString();
151+
}
130152
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
131-
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props));
153+
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props));
132154
outstandingMessageCount++;
133155

134156
if (outstandingMessageCount == batchSize)
@@ -139,9 +161,13 @@ async Task PublishMessagesInBatchAsync()
139161
{
140162
await pt;
141163
}
164+
catch (PublishException pex)
165+
{
166+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, pex.IsReturn: '{pex.IsReturn}', seq no: '{pex.PublishSequenceNumber}'");
167+
}
142168
catch (Exception ex)
143169
{
144-
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
170+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw exception, ex: '{ex}'");
145171
}
146172
}
147173
publishTasks.Clear();
@@ -157,9 +183,13 @@ async Task PublishMessagesInBatchAsync()
157183
{
158184
await pt;
159185
}
186+
catch (PublishException pex)
187+
{
188+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, pex.IsReturn: '{pex.IsReturn}', seq no: '{pex.PublishSequenceNumber}'");
189+
}
160190
catch (Exception ex)
161191
{
162-
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
192+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw exception, ex: '{ex}'");
163193
}
164194
}
165195
publishTasks.Clear();
@@ -236,22 +266,23 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
236266
}
237267
}
238268

239-
channel.BasicReturnAsync += (sender, ea) =>
269+
channel.BasicReturnAsync += async (sender, ea) =>
240270
{
241271
ulong sequenceNumber = 0;
242272

243273
IReadOnlyBasicProperties props = ea.BasicProperties;
244274
if (props.Headers is not null)
245275
{
246276
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
247-
if (maybeSeqNum is not null)
277+
if (maybeSeqNum is long longSequenceNumber)
248278
{
249-
sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
279+
sequenceNumber = (ulong)longSequenceNumber;
250280
}
251281
}
252282

253-
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
254-
return CleanOutstandingConfirms(sequenceNumber, false);
283+
await Console.Out.WriteLineAsync($"{DateTime.Now} [INFO] message sequence number '{sequenceNumber}' has been basic.return-ed");
284+
285+
await CleanOutstandingConfirms(sequenceNumber, false);
255286
};
256287

257288
channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
@@ -290,13 +321,21 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
290321
// This will cause a basic.return, for fun
291322
rk = Guid.NewGuid().ToString();
292323
}
324+
325+
var msgProps = new BasicProperties
326+
{
327+
Persistent = true,
328+
Headers = new Dictionary<string, object?>()
329+
};
330+
331+
msgProps.Headers.Add(Constants.PublishSequenceNumberHeader, (long)nextPublishSeqNo);
332+
293333
(ulong, ValueTask) data =
294-
(nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props));
334+
(nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: msgProps));
295335
publishTasks.Add(data);
296336
}
297337

298338
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
299-
// await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
300339
foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks)
301340
{
302341
try

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
using System.Threading;
3737
using System.Threading.Tasks;
3838
using RabbitMQ.Client.Framing;
39-
using RabbitMQ.Client.Util;
4039

4140
namespace RabbitMQ.Client.Impl
4241
{
@@ -217,9 +216,14 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
217216
{
218217
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled)
219218
{
220-
byte[] publishSequenceNumberBytes = new byte[8];
221-
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber);
222-
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
219+
if (publishSequenceNumber > long.MaxValue)
220+
{
221+
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumber.ToString();
222+
}
223+
else
224+
{
225+
headers[Constants.PublishSequenceNumberHeader] = (long)publishSequenceNumber;
226+
}
223227
}
224228
}
225229
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@
3434
using System.Collections.Generic;
3535
using System.Diagnostics;
3636
using System.Runtime.CompilerServices;
37+
using System.Text;
3738
using System.Threading;
3839
using System.Threading.RateLimiting;
3940
using System.Threading.Tasks;
4041
using RabbitMQ.Client.Events;
4142
using RabbitMQ.Client.Exceptions;
4243
using RabbitMQ.Client.Framing;
43-
using RabbitMQ.Client.Util;
4444

4545
namespace RabbitMQ.Client.Impl
4646
{
@@ -233,10 +233,20 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
233233
ulong publishSequenceNumber = 0;
234234
IReadOnlyBasicProperties props = basicReturnEvent.BasicProperties;
235235
object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader];
236-
if (maybeSeqNum != null &&
237-
maybeSeqNum is byte[] seqNumBytes)
236+
if (maybeSeqNum != null)
238237
{
239-
publishSequenceNumber = NetworkOrderDeserializer.ReadUInt64(seqNumBytes);
238+
switch (maybeSeqNum)
239+
{
240+
case long seqNumLong:
241+
publishSequenceNumber = (ulong)seqNumLong;
242+
break;
243+
case string seqNumString:
244+
publishSequenceNumber = ulong.Parse(seqNumString);
245+
break;
246+
case byte[] seqNumBytes:
247+
publishSequenceNumber = ulong.Parse(Encoding.ASCII.GetString(seqNumBytes));
248+
break;
249+
}
240250
}
241251

242252
HandleNack(publishSequenceNumber, multiple: false, isReturn: true);

0 commit comments

Comments
 (0)