Skip to content

Commit 2746891

Browse files
Merge pull request #368 from rabbitmq/rabbitmq-dotnet-client-364
Introduce IBasicPublishBatch
2 parents de34bca + 4e655c2 commit 2746891

File tree

11 files changed

+280
-5
lines changed

11 files changed

+280
-5
lines changed

projects/client/ApigenBootstrap/ApigenBootstrap.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
</PropertyGroup>
1616

1717
<ItemGroup>
18-
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
18+
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\IBasicPublishBatch.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
1919
</ItemGroup>
2020

2121
</Project>
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2016 Pivotal Software, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is Pivotal Software, Inc.
38+
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
namespace RabbitMQ.Client
41+
{
42+
public interface IBasicPublishBatch
43+
{
44+
void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, byte[] body);
45+
void Publish();
46+
}
47+
}

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
using RabbitMQ.Client.Apigen.Attributes;
4242
using RabbitMQ.Client.Events;
43+
using RabbitMQ.Client;
4344
using System;
4445
using System.Collections.Generic;
4546

@@ -278,6 +279,12 @@ void BasicPublish(string exchange, string routingKey, bool mandatory,
278279
[AmqpMethodDoNotImplement(null)]
279280
void ConfirmSelect();
280281

282+
/// <summary>
283+
/// Creates a BasicPublishBatch instance
284+
/// </summary>
285+
[AmqpMethodDoNotImplement(null)]
286+
IBasicPublishBatch CreateBasicPublishBatch();
287+
281288
/// <summary>
282289
/// Construct a completely empty content header for use with the Basic content class.
283290
/// </summary>

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,5 +1212,10 @@ protected void RunRecoveryEventHandlers()
12121212
}
12131213
}
12141214
}
1215+
1216+
public IBasicPublishBatch CreateBasicPublishBatch()
1217+
{
1218+
return ((IFullModel)m_delegate).CreateBasicPublishBatch();
1219+
}
12151220
}
12161221
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2016 Pivotal Software, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is Pivotal Software, Inc.
38+
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
namespace RabbitMQ.Client.Impl
42+
{
43+
using System.Collections.Generic;
44+
using RabbitMQ.Client;
45+
using RabbitMQ.Client.Framing.Impl;
46+
using RabbitMQ.Client.Impl;
47+
48+
public class BasicPublishBatch : IBasicPublishBatch
49+
{
50+
private List<Command> commands = new List<Command>();
51+
private ModelBase model;
52+
internal BasicPublishBatch (ModelBase model)
53+
{
54+
this.model = model;
55+
}
56+
57+
public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body)
58+
{
59+
var bp = basicProperties == null ? model.CreateBasicProperties() : basicProperties;
60+
var method = new BasicPublish
61+
{
62+
m_exchange = exchange,
63+
m_routingKey = routingKey,
64+
m_mandatory = mandatory
65+
};
66+
67+
commands.Add(new Command(method, (ContentHeaderBase)bp, body));
68+
}
69+
70+
public void Publish()
71+
{
72+
model.SendCommands(commands);
73+
}
74+
}
75+
}

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ public void Transmit(int channelNumber, Connection connection)
140140
}
141141
}
142142

143+
144+
143145
public void TransmitAsSingleFrame(int channelNumber, Connection connection)
144146
{
145147
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
@@ -166,5 +168,32 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection)
166168

167169
connection.WriteFrameSet(frames);
168170
}
171+
172+
173+
public static List<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IList<Command> commands)
174+
{
175+
var frames = new List<OutboundFrame>();
176+
177+
foreach (var cmd in commands)
178+
{
179+
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
180+
if (cmd.Method.HasContent)
181+
{
182+
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.
183+
184+
frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
185+
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
186+
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
187+
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
188+
{
189+
var remaining = body.Length - offset;
190+
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
191+
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
192+
}
193+
}
194+
}
195+
196+
return frames;
197+
}
169198
}
170199
}

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,16 @@ public class Connection : IConnection
106106
private Timer _heartbeatWriteTimer;
107107
private Timer _heartbeatReadTimer;
108108
private AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
109-
private AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false);
109+
110+
#if CORECLR
111+
private static string version = typeof(Connection).GetTypeInfo().Assembly
112+
.GetCustomAttribute<AssemblyInformationalVersionAttribute>()
113+
.InformationalVersion;
114+
#else
115+
private static string version = typeof(Connection).Assembly
116+
.GetCustomAttribute<AssemblyInformationalVersionAttribute>()
117+
.InformationalVersion;
118+
#endif
110119

111120
#if CORECLR
112121
private static string version = typeof(Connection).GetTypeInfo().Assembly
@@ -545,7 +554,6 @@ public void FinishClose()
545554
{
546555
// Notify hearbeat loops that they can leave
547556
m_heartbeatRead.Set();
548-
m_heartbeatWrite.Set();
549557
m_closed = true;
550558
MaybeStopHeartbeatTimers();
551559

@@ -1178,13 +1186,11 @@ public override string ToString()
11781186
public void WriteFrame(OutboundFrame f)
11791187
{
11801188
m_frameHandler.WriteFrame(f);
1181-
m_heartbeatWrite.Set();
11821189
}
11831190

11841191
public void WriteFrameSet(IList<OutboundFrame> f)
11851192
{
11861193
m_frameHandler.WriteFrameSet(f);
1187-
m_heartbeatWrite.Set();
11881194
}
11891195

11901196
///<summary>API-side invocation of connection abort.</summary>

projects/client/RabbitMQ.Client/src/client/impl/ISession.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Collections.Generic;
4243

4344
namespace RabbitMQ.Client.Impl
4445
{
@@ -79,5 +80,6 @@ public interface ISession
7980
void HandleFrame(InboundFrame frame);
8081
void Notify();
8182
void Transmit(Command cmd);
83+
void Transmit(IList<Command> cmd);
8284
}
8385
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,6 +1205,26 @@ public abstract void BasicNack(ulong deliveryTag,
12051205
bool multiple,
12061206
bool requeue);
12071207

1208+
internal void AllocatatePublishSeqNos(int count)
1209+
{
1210+
var c = 0;
1211+
lock (m_unconfirmedSet.SyncRoot)
1212+
{
1213+
while(c < count)
1214+
{
1215+
if (NextPublishSeqNo > 0)
1216+
{
1217+
if (!m_unconfirmedSet.Contains(NextPublishSeqNo))
1218+
{
1219+
m_unconfirmedSet.Add(NextPublishSeqNo);
1220+
}
1221+
NextPublishSeqNo++;
1222+
}
1223+
c++;
1224+
}
1225+
}
1226+
}
1227+
12081228
public void BasicPublish(string exchange,
12091229
string routingKey,
12101230
bool mandatory,
@@ -1273,6 +1293,10 @@ public void ConfirmSelect()
12731293
///////////////////////////////////////////////////////////////////////////
12741294

12751295
public abstract IBasicProperties CreateBasicProperties();
1296+
public IBasicPublishBatch CreateBasicPublishBatch()
1297+
{
1298+
return new BasicPublishBatch(this);
1299+
}
12761300

12771301

12781302
public void ExchangeBind(string destination,
@@ -1496,6 +1520,13 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
14961520
}
14971521
}
14981522

1523+
internal void SendCommands(IList<Command> commands)
1524+
{
1525+
m_flowControlBlock.WaitOne();
1526+
AllocatatePublishSeqNos(commands.Count);
1527+
Session.Transmit(commands);
1528+
}
1529+
14991530
protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack)
15001531
{
15011532
lock (m_unconfirmedSet.SyncRoot)
@@ -1534,6 +1565,7 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
15341565
return k.m_result;
15351566
}
15361567

1568+
15371569
public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation
15381570
{
15391571
public IBasicConsumer m_consumer;

projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
using System;
4242
using RabbitMQ.Client.Exceptions;
4343
using RabbitMQ.Client.Framing.Impl;
44+
using System.Collections.Generic;
4445

4546
namespace RabbitMQ.Client.Impl
4647
{
@@ -199,5 +200,9 @@ public virtual void Transmit(Command cmd)
199200
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
200201
cmd.Transmit(ChannelNumber, Connection);
201202
}
203+
public virtual void Transmit(IList<Command> commands)
204+
{
205+
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
206+
}
202207
}
203208
}

0 commit comments

Comments
 (0)