Skip to content

Commit 98299f8

Browse files
authored
Merge pull request #824 from stebet/arrayPoolOptimizations
Optimizing memory usage even further.
2 parents e95f22a + 5c6d3b0 commit 98299f8

14 files changed

+131
-119
lines changed

projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
using RabbitMQ.Client.Events;
77

8-
using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;
9-
108
namespace RabbitMQ.Client
119
{
1210
public class AsyncDefaultBasicConsumer : IBasicConsumer, IAsyncBasicConsumer
@@ -96,7 +94,7 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
9694
{
9795
_consumerTags.Add(consumerTag);
9896
IsRunning = true;
99-
return TaskExtensions.CompletedTask;
97+
return Task.CompletedTask;
10098
}
10199

102100
/// <summary>
@@ -118,7 +116,7 @@ public virtual Task HandleBasicDeliver(string consumerTag,
118116
ReadOnlyMemory<byte> body)
119117
{
120118
// Nothing to do here.
121-
return TaskExtensions.CompletedTask;
119+
return Task.CompletedTask;
122120
}
123121

124122
/// <summary>

projects/RabbitMQ.Client/client/api/IBasicPublishBatch.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
// The Initial Developer of the Original Code is Pivotal Software, Inc.
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
40+
using System;
41+
4042
namespace RabbitMQ.Client
4143
{
4244
public interface IBasicPublishBatch

projects/RabbitMQ.Client/client/exceptions/UnexpectedFrameException.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,10 @@ namespace RabbitMQ.Client.Exceptions
4848
/// </summary>
4949
public class UnexpectedFrameException : HardProtocolException
5050
{
51-
internal UnexpectedFrameException(Frame frame)
52-
: base("A frame of this type was not expected at this time")
51+
internal UnexpectedFrameException(FrameType frameType) : base($"A frame of type {frameType} was not expected at this time")
5352
{
54-
Frame = frame;
5553
}
5654

57-
internal Frame Frame { get; }
58-
5955
public override ushort ReplyCode
6056
{
6157
get { return Constants.CommandInvalid; }

projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
4545
string exchange,
4646
string routingKey,
4747
IBasicProperties basicProperties,
48-
ReadOnlyMemory<byte> body)
48+
ReadOnlySpan<byte> body)
4949
{
50-
IMemoryOwner<byte> bodyCopy = MemoryPool<byte>.Shared.Rent(body.Length);
51-
body.CopyTo(bodyCopy.Memory);
52-
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy, body.Length));
50+
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(body.Length);
51+
Memory<byte> bodyCopy = new Memory<byte>(bodyBytes, 0, body.Length);
52+
body.CopyTo(bodyCopy.Span);
53+
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy));
5354
}
5455

5556
public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)

projects/RabbitMQ.Client/client/impl/BasicDeliver.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Buffers;
33
using System.Collections.Generic;
4+
using System.Runtime.InteropServices;
45
using System.Threading.Tasks;
56

67
using RabbitMQ.Client.Events;
@@ -15,8 +16,7 @@ sealed class BasicDeliver : Work
1516
readonly string _exchange;
1617
readonly string _routingKey;
1718
readonly IBasicProperties _basicProperties;
18-
readonly IMemoryOwner<byte> _body;
19-
readonly int _bodyLength;
19+
readonly ReadOnlyMemory<byte> _body;
2020

2121
public BasicDeliver(IBasicConsumer consumer,
2222
string consumerTag,
@@ -25,8 +25,7 @@ public BasicDeliver(IBasicConsumer consumer,
2525
string exchange,
2626
string routingKey,
2727
IBasicProperties basicProperties,
28-
IMemoryOwner<byte> body,
29-
int bodyLength) : base(consumer)
28+
ReadOnlyMemory<byte> body) : base(consumer)
3029
{
3130
_consumerTag = consumerTag;
3231
_deliveryTag = deliveryTag;
@@ -35,7 +34,6 @@ public BasicDeliver(IBasicConsumer consumer,
3534
_routingKey = routingKey;
3635
_basicProperties = basicProperties;
3736
_body = body;
38-
_bodyLength = bodyLength;
3937
}
4038

4139
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
@@ -48,7 +46,7 @@ await consumer.HandleBasicDeliver(_consumerTag,
4846
_exchange,
4947
_routingKey,
5048
_basicProperties,
51-
_body.Memory.Slice(0, _bodyLength)).ConfigureAwait(false);
49+
_body).ConfigureAwait(false);
5250
}
5351
catch (Exception e)
5452
{
@@ -61,7 +59,10 @@ await consumer.HandleBasicDeliver(_consumerTag,
6159
}
6260
finally
6361
{
64-
_body.Dispose();
62+
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
63+
{
64+
ArrayPool<byte>.Shared.Return(segment.Array);
65+
}
6566
}
6667
}
6768
}

projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41+
using System;
42+
using System.Buffers;
4143
using System.Collections.Generic;
4244

4345
using RabbitMQ.Client.Framing.Impl;
@@ -63,7 +65,7 @@ public void Add(string exchange, string routingKey, bool mandatory, IBasicProper
6365
_mandatory = mandatory
6466
};
6567

66-
_commands.Add(new Command(method, (ContentHeaderBase)bp, body));
68+
_commands.Add(new Command(method, (ContentHeaderBase)bp, body, false));
6769
}
6870

6971
public void Publish()

projects/RabbitMQ.Client/client/impl/Command.cs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
using System;
4242
using System.Buffers;
4343
using System.Collections.Generic;
44+
using System.Runtime.InteropServices;
4445
using RabbitMQ.Client.Exceptions;
4546
using RabbitMQ.Client.Framing.Impl;
4647

@@ -54,31 +55,23 @@ class Command : IDisposable
5455
// - 4 bytes of frame payload length
5556
// - 1 byte of payload trailer FrameEnd byte
5657
private const int EmptyFrameSize = 8;
57-
private static readonly byte[] s_emptyByteArray = new byte[0];
58-
private readonly IMemoryOwner<byte> _body;
58+
private readonly bool _returnBufferOnDispose;
5959

6060
static Command()
6161
{
6262
CheckEmptyFrameSize();
6363
}
6464

65-
internal Command(MethodBase method) : this(method, null, null, 0)
65+
internal Command(MethodBase method) : this(method, null, null, false)
6666
{
6767
}
6868

69-
internal Command(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte> body)
69+
public Command(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte> body, bool returnBufferOnDispose)
7070
{
7171
Method = method;
7272
Header = header;
7373
Body = body;
74-
}
75-
76-
public Command(MethodBase method, ContentHeaderBase header, IMemoryOwner<byte> body, int bodySize)
77-
{
78-
Method = method;
79-
Header = header;
80-
_body = body;
81-
Body = _body?.Memory.Slice(0, bodySize) ?? s_emptyByteArray;
74+
_returnBufferOnDispose = returnBufferOnDispose;
8275
}
8376

8477
public ReadOnlyMemory<byte> Body { get; private set; }
@@ -167,9 +160,9 @@ internal static List<OutboundFrame> CalculateFrames(int channelNumber, Connectio
167160

168161
public void Dispose()
169162
{
170-
if (_body is IMemoryOwner<byte>)
163+
if(_returnBufferOnDispose && MemoryMarshal.TryGetArray(Body, out ArraySegment<byte> segment))
171164
{
172-
_body.Dispose();
165+
ArrayPool<byte>.Shared.Return(segment.Array);
173166
}
174167
}
175168
}

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41+
using System;
4142
using System.Buffers;
4243
using RabbitMQ.Client.Exceptions;
4344
using RabbitMQ.Client.Framing.Impl;
@@ -59,7 +60,7 @@ class CommandAssembler
5960

6061
public MethodBase m_method;
6162
public ContentHeaderBase m_header;
62-
public IMemoryOwner<byte> m_body;
63+
public Memory<byte> m_body;
6364
public ProtocolBase m_protocol;
6465
public int m_remainingBodyBytes;
6566
private int _offset;
@@ -78,39 +79,40 @@ public Command HandleFrame(InboundFrame f)
7879
case AssemblyState.ExpectingMethod:
7980
if (!f.IsMethod())
8081
{
81-
throw new UnexpectedFrameException(f);
82+
throw new UnexpectedFrameException(f.Type);
8283
}
8384
m_method = m_protocol.DecodeMethodFrom(f.Payload);
8485
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
8586
return CompletedCommand();
8687
case AssemblyState.ExpectingContentHeader:
8788
if (!f.IsHeader())
8889
{
89-
throw new UnexpectedFrameException(f);
90+
throw new UnexpectedFrameException(f.Type);
9091
}
9192
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(f.Payload));
9293
ulong totalBodyBytes = m_header.ReadFrom(f.Payload.Slice(2));
9394
if (totalBodyBytes > MaxArrayOfBytesSize)
9495
{
95-
throw new UnexpectedFrameException(f);
96+
throw new UnexpectedFrameException(f.Type);
9697
}
9798

9899
m_remainingBodyBytes = (int)totalBodyBytes;
99-
m_body = MemoryPool<byte>.Shared.Rent(m_remainingBodyBytes);
100+
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(m_remainingBodyBytes);
101+
m_body = new Memory<byte>(bodyBytes, 0, m_remainingBodyBytes);
100102
UpdateContentBodyState();
101103
return CompletedCommand();
102104
case AssemblyState.ExpectingContentBody:
103105
if (!f.IsBody())
104106
{
105-
throw new UnexpectedFrameException(f);
107+
throw new UnexpectedFrameException(f.Type);
106108
}
107109

108110
if (f.Payload.Length > m_remainingBodyBytes)
109111
{
110112
throw new MalformedFrameException($"Overlong content body received - {m_remainingBodyBytes} bytes remaining, {f.Payload.Length} bytes received");
111113
}
112114

113-
f.Payload.CopyTo(m_body.Memory.Slice(_offset));
115+
f.Payload.CopyTo(m_body.Slice(_offset));
114116
m_remainingBodyBytes -= f.Payload.Length;
115117
_offset += f.Payload.Length;
116118
UpdateContentBodyState();
@@ -125,7 +127,7 @@ private Command CompletedCommand()
125127
{
126128
if (m_state == AssemblyState.Complete)
127129
{
128-
Command result = new Command(m_method, m_header, m_body, _offset);
130+
Command result = new Command(m_method, m_header, m_body, true);
129131
Reset();
130132
return result;
131133
}

projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,11 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
6262
string exchange,
6363
string routingKey,
6464
IBasicProperties basicProperties,
65-
ReadOnlyMemory<byte> body)
65+
ReadOnlySpan<byte> body)
6666
{
67-
IMemoryOwner<byte> memoryCopy = MemoryPool<byte>.Shared.Rent(body.Length);
68-
body.CopyTo(memoryCopy.Memory);
67+
byte[] memoryCopyArray = ArrayPool<byte>.Shared.Rent(body.Length);
68+
Memory<byte> memoryCopy = new Memory<byte>(memoryCopyArray, 0, body.Length);
69+
body.CopyTo(memoryCopy.Span);
6970
UnlessShuttingDown(() =>
7071
{
7172
try
@@ -76,7 +77,7 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
7677
exchange,
7778
routingKey,
7879
basicProperties,
79-
memoryCopy.Memory.Slice(0, body.Length));
80+
memoryCopy);
8081
}
8182
catch (Exception e)
8283
{
@@ -89,7 +90,7 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
8990
}
9091
finally
9192
{
92-
memoryCopy.Dispose();
93+
ArrayPool<byte>.Shared.Return(memoryCopyArray);
9394
}
9495
});
9596
}

0 commit comments

Comments
 (0)