Skip to content

Commit 4506de7

Browse files
committed
Serialize sequence number as long or string.
Fixes issue raised in #1805 Encode publish sequence number in x-dotnet-pub-seq-no as a `long` instead of ASCII string, unless value is greater than `long.MaxValue`.
1 parent 85fd42f commit 4506de7

File tree

3 files changed

+81
-18
lines changed

3 files changed

+81
-18
lines changed

projects/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Buffers.Binary;
3433
using System.Collections.Generic;
3534
using System.Diagnostics;
3635
using System.Text;
3736
using System.Threading;
3837
using System.Threading.Tasks;
3938
using RabbitMQ.Client;
39+
using RabbitMQ.Client.Exceptions;
4040

4141
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
4242

@@ -54,7 +54,7 @@
5454
Persistent = true
5555
};
5656

57-
string hostname = "localhost";
57+
string hostname = "shostakovich";
5858
if (args.Length > 0)
5959
{
6060
if (false == string.IsNullOrWhiteSpace(args[0]))
@@ -124,11 +124,43 @@ async Task PublishMessagesInBatchAsync()
124124
var sw = new Stopwatch();
125125
sw.Start();
126126

127+
channel.BasicReturnAsync += (sender, ea) =>
128+
{
129+
string sequenceNumber = string.Empty;
130+
131+
IReadOnlyBasicProperties props = ea.BasicProperties;
132+
if (props.Headers is not null)
133+
{
134+
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
135+
if (maybeSeqNum is not null)
136+
{
137+
switch (maybeSeqNum)
138+
{
139+
case byte[] seqNumBytes:
140+
sequenceNumber = Encoding.ASCII.GetString(seqNumBytes);
141+
break;
142+
case string seqNumStr:
143+
sequenceNumber = seqNumStr;
144+
break;
145+
}
146+
}
147+
}
148+
149+
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
150+
151+
return Task.CompletedTask;
152+
};
153+
127154
var publishTasks = new List<ValueTask>();
128155
for (int i = 0; i < MESSAGE_COUNT; i++)
129156
{
157+
string rk = queueName;
158+
if (i % 1000 == 0)
159+
{
160+
rk = Guid.NewGuid().ToString();
161+
}
130162
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
131-
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props));
163+
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props));
132164
outstandingMessageCount++;
133165

134166
if (outstandingMessageCount == batchSize)
@@ -139,9 +171,13 @@ async Task PublishMessagesInBatchAsync()
139171
{
140172
await pt;
141173
}
174+
catch (PublishException pex)
175+
{
176+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, pex.IsReturn: '{pex.IsReturn}', seq no: '{pex.PublishSequenceNumber}'");
177+
}
142178
catch (Exception ex)
143179
{
144-
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
180+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw exception, ex: '{ex}'");
145181
}
146182
}
147183
publishTasks.Clear();
@@ -157,9 +193,13 @@ async Task PublishMessagesInBatchAsync()
157193
{
158194
await pt;
159195
}
196+
catch (PublishException pex)
197+
{
198+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, pex.IsReturn: '{pex.IsReturn}', seq no: '{pex.PublishSequenceNumber}'");
199+
}
160200
catch (Exception ex)
161201
{
162-
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
202+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw exception, ex: '{ex}'");
163203
}
164204
}
165205
publishTasks.Clear();
@@ -244,13 +284,14 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
244284
if (props.Headers is not null)
245285
{
246286
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
247-
if (maybeSeqNum is not null)
287+
if (maybeSeqNum is long longSequenceNumber)
248288
{
249-
sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
289+
sequenceNumber = (ulong)longSequenceNumber;
250290
}
251291
}
252292

253-
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
293+
Console.WriteLine($"{DateTime.Now} [INFO] message sequence number '{sequenceNumber}' has been basic.return-ed");
294+
254295
return CleanOutstandingConfirms(sequenceNumber, false);
255296
};
256297

@@ -290,13 +331,21 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
290331
// This will cause a basic.return, for fun
291332
rk = Guid.NewGuid().ToString();
292333
}
334+
335+
var msgProps = new BasicProperties
336+
{
337+
Persistent = true,
338+
Headers = new Dictionary<string, object?>()
339+
};
340+
341+
msgProps.Headers.Add(Constants.PublishSequenceNumberHeader, (long)nextPublishSeqNo);
342+
293343
(ulong, ValueTask) data =
294-
(nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props));
344+
(nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: msgProps));
295345
publishTasks.Add(data);
296346
}
297347

298348
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
299-
// await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
300349
foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks)
301350
{
302351
try

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
using System.Threading;
3737
using System.Threading.Tasks;
3838
using RabbitMQ.Client.Framing;
39-
using RabbitMQ.Client.Util;
4039

4140
namespace RabbitMQ.Client.Impl
4241
{
@@ -217,9 +216,14 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
217216
{
218217
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled)
219218
{
220-
byte[] publishSequenceNumberBytes = new byte[8];
221-
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber);
222-
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
219+
if (publishSequenceNumber > long.MaxValue)
220+
{
221+
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumber.ToString();
222+
}
223+
else
224+
{
225+
headers[Constants.PublishSequenceNumberHeader] = (long)publishSequenceNumber;
226+
}
223227
}
224228
}
225229
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@
3434
using System.Collections.Generic;
3535
using System.Diagnostics;
3636
using System.Runtime.CompilerServices;
37+
using System.Text;
3738
using System.Threading;
3839
using System.Threading.RateLimiting;
3940
using System.Threading.Tasks;
4041
using RabbitMQ.Client.Events;
4142
using RabbitMQ.Client.Exceptions;
4243
using RabbitMQ.Client.Framing;
43-
using RabbitMQ.Client.Util;
4444

4545
namespace RabbitMQ.Client.Impl
4646
{
@@ -233,10 +233,20 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
233233
ulong publishSequenceNumber = 0;
234234
IReadOnlyBasicProperties props = basicReturnEvent.BasicProperties;
235235
object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader];
236-
if (maybeSeqNum != null &&
237-
maybeSeqNum is byte[] seqNumBytes)
236+
if (maybeSeqNum != null)
238237
{
239-
publishSequenceNumber = NetworkOrderDeserializer.ReadUInt64(seqNumBytes);
238+
switch (maybeSeqNum)
239+
{
240+
case long seqNumLong:
241+
publishSequenceNumber = (ulong)seqNumLong;
242+
break;
243+
case string seqNumString:
244+
publishSequenceNumber = ulong.Parse(seqNumString);
245+
break;
246+
case byte[] seqNumBytes:
247+
publishSequenceNumber = ulong.Parse(Encoding.ASCII.GetString(seqNumBytes));
248+
break;
249+
}
240250
}
241251

242252
HandleNack(publishSequenceNumber, multiple: false, isReturn: true);

0 commit comments

Comments
 (0)