Skip to content

Commit a91a86a

Browse files
committed
merge bug21377 into default
2 parents 5c92fe1 + 86a2c2c commit a91a86a

File tree

4 files changed

+151
-5
lines changed

4 files changed

+151
-5
lines changed

projects/client/ApigenBootstrap/RabbitMQ.Client.ApigenBootstrap.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@
9090
<Compile Include="..\RabbitMQ.Client\src\client\events\ModelShutdownEventHandler.cs">
9191
<Link>src\client\events\ModelShutdownEventHandler.cs</Link>
9292
</Compile>
93+
<Compile Include="..\RabbitMQ.Client\src\client\events\FlowControlEventHandler.cs">
94+
<Link>src\client\events\FlowControlEventHandler.cs</Link>
95+
</Compile>
9396
<Compile Include="properties\AssemblyInfo.cs" />
9497
</ItemGroup>
9598

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public interface IModel: IDisposable
9393
///</remarks>
9494
event CallbackExceptionEventHandler CallbackException;
9595

96+
event FlowControlEventHandler FlowControl;
97+
9698
///<summary>All messages received before this fires that haven't been
9799
///ack'ed will be redelivered. All messages received afterwards won't
98100
///be.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial
8+
// Technologies LLC., and Rabbit Technologies Ltd.
9+
//
10+
// Licensed under the Apache License, Version 2.0 (the "License");
11+
// you may not use this file except in compliance with the License.
12+
// You may obtain a copy of the License at
13+
//
14+
// http://www.apache.org/licenses/LICENSE-2.0
15+
//
16+
// Unless required by applicable law or agreed to in writing, software
17+
// distributed under the License is distributed on an "AS IS" BASIS,
18+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
// See the License for the specific language governing permissions and
20+
// limitations under the License.
21+
//---------------------------------------------------------------------------
22+
//
23+
// The MPL v1.1:
24+
//
25+
//---------------------------------------------------------------------------
26+
// The contents of this file are subject to the Mozilla Public License
27+
// Version 1.1 (the "License"); you may not use this file except in
28+
// compliance with the License. You may obtain a copy of the License at
29+
// http://www.rabbitmq.com/mpl.html
30+
//
31+
// Software distributed under the License is distributed on an "AS IS"
32+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33+
// License for the specific language governing rights and limitations
34+
// under the License.
35+
//
36+
// The Original Code is The RabbitMQ .NET Client.
37+
//
38+
// The Initial Developers of the Original Code are LShift Ltd,
39+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40+
//
41+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44+
// Technologies LLC, and Rabbit Technologies Ltd.
45+
//
46+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
47+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
48+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
49+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
50+
// (C) 2007-2010 Rabbit Technologies Ltd.
51+
//
52+
// All Rights Reserved.
53+
//
54+
// Contributor(s): ______________________________________.
55+
//
56+
//---------------------------------------------------------------------------
57+
58+
using System;
59+
60+
namespace RabbitMQ.Client.Events
61+
{
62+
63+
///<summary>Delegate used to process flow control events.</summary>
64+
public delegate void FlowControlEventHandler(IModel sender, FlowControlEventArgs args);
65+
66+
///<summary>Event relating to flow control</summary>
67+
public class FlowControlEventArgs : EventArgs
68+
{
69+
private readonly bool m_active;
70+
71+
public FlowControlEventArgs(bool active)
72+
{
73+
m_active = active;
74+
}
75+
76+
///<summary>Access the flow control setting</summary>
77+
public bool Active { get { return m_active; } }
78+
}
79+
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,11 @@ public abstract class ModelBase : IFullModel
8282
private readonly object m_eventLock = new object();
8383
private BasicReturnEventHandler m_basicReturn;
8484
private CallbackExceptionEventHandler m_callbackException;
85+
private FlowControlEventHandler m_flowControl;
8586
private BasicRecoverOkEventHandler m_basicRecoverOk;
8687

8788
public ManualResetEvent m_flowControlBlock = new ManualResetEvent(true);
89+
private readonly object m_flowSendLock = new object();
8890

8991
public event ModelShutdownEventHandler ModelShutdown
9092
{
@@ -149,6 +151,24 @@ public event CallbackExceptionEventHandler CallbackException
149151
}
150152
}
151153

154+
public event FlowControlEventHandler FlowControl
155+
{
156+
add
157+
{
158+
lock (m_eventLock)
159+
{
160+
m_flowControl += value;
161+
}
162+
}
163+
remove
164+
{
165+
lock (m_eventLock)
166+
{
167+
m_flowControl -= value;
168+
}
169+
}
170+
}
171+
152172
public event BasicRecoverOkEventHandler BasicRecoverOk
153173
{
154174
add
@@ -298,6 +318,31 @@ public virtual void OnCallbackException(CallbackExceptionEventArgs args)
298318
}
299319
}
300320

321+
public virtual void OnFlowControl(FlowControlEventArgs args)
322+
{
323+
FlowControlEventHandler handler;
324+
lock (m_eventLock)
325+
{
326+
handler = m_flowControl;
327+
}
328+
if (handler != null)
329+
{
330+
foreach (FlowControlEventHandler h in handler.GetInvocationList())
331+
{
332+
try
333+
{
334+
h(this, args);
335+
}
336+
catch (Exception e)
337+
{
338+
CallbackExceptionEventArgs exnArgs = new CallbackExceptionEventArgs(e);
339+
exnArgs.Detail["context"] = "OnFlowControl";
340+
OnCallbackException(exnArgs);
341+
}
342+
}
343+
}
344+
}
345+
301346
public virtual void OnBasicRecoverOk(EventArgs args)
302347
{
303348
BasicRecoverOkEventHandler handler;
@@ -373,10 +418,18 @@ public bool IsOpen
373418

374419
public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
375420
{
376-
if (method.HasContent) {
377-
m_flowControlBlock.WaitOne();
421+
if (method.HasContent)
422+
{
423+
lock (m_flowSendLock)
424+
{
425+
m_flowControlBlock.WaitOne();
426+
m_session.Transmit(new Command(method, header, body));
427+
}
428+
}
429+
else
430+
{
431+
m_session.Transmit(new Command(method, header, body));
378432
}
379-
m_session.Transmit(new Command(method, header, body));
380433
}
381434

382435
public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] body)
@@ -451,10 +504,19 @@ public void HandleBasicReturn(ushort replyCode,
451504
public void HandleChannelFlow(bool active)
452505
{
453506
if (active)
507+
{
454508
m_flowControlBlock.Set();
509+
_Private_ChannelFlowOk(active);
510+
}
455511
else
456-
m_flowControlBlock.Reset();
457-
_Private_ChannelFlowOk(active);
512+
{
513+
lock (m_flowSendLock)
514+
{
515+
m_flowControlBlock.Reset();
516+
_Private_ChannelFlowOk(active);
517+
}
518+
}
519+
OnFlowControl(new FlowControlEventArgs(active));
458520
}
459521

460522
public void HandleConnectionStart(byte versionMajor,

0 commit comments

Comments
 (0)