Skip to content

Ensures connection recovery does not keep going after the connection … #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions projects/client/RabbitMQ.Client/src/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public interface IConnection : NetworkConnection, IDisposable
/// <remarks>
/// Note that all active channels, sessions, and models will be closed if this method is called.
/// In comparison to normal <see cref="Close()"/> method, <see cref="Abort()"/> will not throw
/// <see cref="AlreadyClosedException"/> or <see cref="IOException"/> during closing connection.
/// <see cref="IOException"/> during closing connection.
///This method waits infinitely for the in-progress close operation to complete.
/// </remarks>
void Abort();
Expand All @@ -206,7 +206,7 @@ public interface IConnection : NetworkConnection, IDisposable
/// The method behaves in the same way as <see cref="Abort()"/>, with the only
/// difference that the connection is closed with the given connection close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP specification)
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification)
/// </para>
/// <para>
/// A message indicating the reason for closing the connection
Expand Down Expand Up @@ -253,7 +253,7 @@ public interface IConnection : NetworkConnection, IDisposable
/// closed if this method is called. It will wait for the in-progress
/// close operation to complete. This method will not return to the caller
/// until the shutdown is complete. If the connection is already closed
/// (or closing), then this method will throw <see cref="AlreadyClosedException"/>.
/// (or closing), then this method will do nothing.
/// It can also throw <see cref="IOException"/> when socket was closed unexpectedly.
/// </remarks>
void Close();
Expand Down Expand Up @@ -281,7 +281,7 @@ public interface IConnection : NetworkConnection, IDisposable
/// Note that all active channels, sessions, and models will be
/// closed if this method is called. It will wait for the in-progress
/// close operation to complete with a timeout. If the connection is
/// already closed (or closing), then this method will throw <see cref="AlreadyClosedException"/>.
/// already closed (or closing), then this method will do nothing.
/// It can also throw <see cref="IOException"/> when socket was closed unexpectedly.
/// If timeout is reached and the close operations haven't finished, then socket is forced to close.
/// <para>
Expand All @@ -298,7 +298,7 @@ public interface IConnection : NetworkConnection, IDisposable
/// The method behaves in the same way as <see cref="Close(int)"/>, with the only
/// difference that the connection is closed with the given connection close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP specification).
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification).
/// </para>
/// <para>
/// A message indicating the reason for closing the connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,14 @@ public void BeginAutomaticRecovery()

recoveryTaskFactory.StartNew(() =>
{
if(!self.ManuallyClosed)
if (!self.ManuallyClosed)
{
try
{
#if NETFX_CORE
System.Threading.Tasks.Task.Delay(m_factory.NetworkRecoveryInterval).Wait();
System.Threading.Tasks.Task.Delay(m_factory.NetworkRecoveryInterval).Wait();
#else
Thread.Sleep(m_factory.NetworkRecoveryInterval);
Thread.Sleep(m_factory.NetworkRecoveryInterval);
#endif
self.PerformAutomaticRecovery();
}
Expand All @@ -422,19 +422,21 @@ protected void PerformAutomaticRecovery()
{
lock (recoveryLockTarget)
{
RecoverConnectionDelegate();
RecoverConnectionShutdownHandlers();
RecoverConnectionBlockedHandlers();
RecoverConnectionUnblockedHandlers();

RecoverModels();
if (m_factory.TopologyRecoveryEnabled)
if (RecoverConnectionDelegate())
{
RecoverEntities();
RecoverConsumers();
}
RecoverConnectionShutdownHandlers();
RecoverConnectionBlockedHandlers();
RecoverConnectionUnblockedHandlers();

RecoverModels();
if (m_factory.TopologyRecoveryEnabled)
{
RecoverEntities();
RecoverConsumers();
}

RunRecoveryEventHandlers();
RunRecoveryEventHandlers();
}
}
}

Expand Down Expand Up @@ -659,56 +661,64 @@ private void Init(IFrameHandler fh)
public void Abort()
{
this.ManuallyClosed = true;
m_delegate.Abort();
if(m_delegate.IsOpen)
m_delegate.Abort();
}

///<summary>API-side invocation of connection abort.</summary>
public void Abort(ushort reasonCode, string reasonText)
{
this.ManuallyClosed = true;
m_delegate.Abort(reasonCode, reasonText);
if (m_delegate.IsOpen)
m_delegate.Abort(reasonCode, reasonText);
}

///<summary>API-side invocation of connection abort with timeout.</summary>
public void Abort(int timeout)
{
this.ManuallyClosed = true;
m_delegate.Abort(timeout);
if (m_delegate.IsOpen)
m_delegate.Abort(timeout);
}

///<summary>API-side invocation of connection abort with timeout.</summary>
public void Abort(ushort reasonCode, string reasonText, int timeout)
{
this.ManuallyClosed = true;
m_delegate.Abort(reasonCode, reasonText, timeout);
if (m_delegate.IsOpen)
m_delegate.Abort(reasonCode, reasonText, timeout);
}

///<summary>API-side invocation of connection.close.</summary>
public void Close()
{
this.ManuallyClosed = true;
m_delegate.Close();
if (m_delegate.IsOpen)
m_delegate.Close();
}

///<summary>API-side invocation of connection.close.</summary>
public void Close(ushort reasonCode, string reasonText)
{
this.ManuallyClosed = true;
m_delegate.Close(reasonCode, reasonText);
if (m_delegate.IsOpen)
m_delegate.Close(reasonCode, reasonText);
}

///<summary>API-side invocation of connection.close with timeout.</summary>
public void Close(int timeout)
{
this.ManuallyClosed = true;
m_delegate.Close(timeout);
if (m_delegate.IsOpen)
m_delegate.Close(timeout);
}

///<summary>API-side invocation of connection.close with timeout.</summary>
public void Close(ushort reasonCode, string reasonText, int timeout)
{
this.ManuallyClosed = true;
m_delegate.Close(reasonCode, reasonText, timeout);
if (m_delegate.IsOpen)
m_delegate.Close(reasonCode, reasonText, timeout);
}

public IModel CreateModel()
Expand Down Expand Up @@ -736,23 +746,17 @@ public void HandleConnectionUnblocked()

void IDisposable.Dispose()
{
try {
try
{
Abort();
}
finally
catch(Exception)
{
m_models.Clear();
// TODO: log
}
if (ShutdownReport.Count > 0)
finally
{
foreach (ShutdownReportEntry entry in ShutdownReport)
{
if (entry.Exception != null)
{
throw entry.Exception;
}
}
throw new OperationInterruptedException(null);
m_models.Clear();
}
}

Expand Down Expand Up @@ -826,16 +830,15 @@ protected void RecoverConnectionBlockedHandlers()
}
}

protected void RecoverConnectionDelegate()
protected bool RecoverConnectionDelegate()
{
bool recovering = true;
while (recovering)
while (!ManuallyClosed)
{
try
{
var fh = endpoints.SelectOne(m_factory.CreateFrameHandler);
m_delegate = new Connection(m_factory, false, fh, this.ClientProvidedName);
recovering = false;
return true;
}
catch (Exception e)
{
Expand Down Expand Up @@ -866,6 +869,8 @@ protected void RecoverConnectionDelegate()
// TODO: provide a way to handle these exceptions
}
}

return false;
}

protected void RecoverConnectionShutdownHandlers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1240,9 +1240,9 @@ public void HandleConnectionUnblocked()

void IDisposable.Dispose()
{
MaybeStopHeartbeatTimers();
try
{
MaybeStopHeartbeatTimers();
Abort();
}
catch (OperationInterruptedException)
Expand Down
20 changes: 19 additions & 1 deletion projects/client/Unit/src/unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void TestBasicConnectionRecoveryWithHostnameList()
public void TestBasicConnectionRecoveryWithHostnameListAndUnreachableHosts()
{
using(var c = CreateAutorecoveringConnection(new List<string> { "191.72.44.22", "127.0.0.1", "localhost" }))
{
{
Assert.IsTrue(c.IsOpen);
CloseAndWaitForRecovery(c);
Assert.IsTrue(c.IsOpen);
Expand Down Expand Up @@ -189,6 +189,24 @@ public void TestBasicConnectionRecoveryErrorEvent()
}
}

[Test]
public void TestBasicConnectionRecoveryStopsAfterManualClose()
{
Assert.IsTrue(Conn.IsOpen);
var c = CreateAutorecoveringConnection();
var latch = new AutoResetEvent(false);
c.ConnectionRecoveryError += (o, args) => latch.Set();
StopRabbitMQ();
latch.WaitOne(30000); // we got the failed reconnection event.
var triedRecoveryAfterClose = false;
c.Close();
Thread.Sleep(5000);
c.ConnectionRecoveryError += (o, args) => triedRecoveryAfterClose = true;
Thread.Sleep(10000);
Assert.IsFalse(triedRecoveryAfterClose);
StartRabbitMQ();
}

[Test]
public void TestBasicConnectionRecoveryWithEndpointListAndUnreachableHosts()
{
Expand Down