diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 87a97eb54..d8b36cd5b 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -445,24 +445,15 @@ RabbitMQ.Client.IChannel.IsClosed.get -> bool RabbitMQ.Client.IChannel.IsOpen.get -> bool RabbitMQ.Client.IChannelExtensions RabbitMQ.Client.IConnection -RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler RabbitMQ.Client.IConnection.ChannelMax.get -> ushort RabbitMQ.Client.IConnection.ClientProperties.get -> System.Collections.Generic.IDictionary RabbitMQ.Client.IConnection.ClientProvidedName.get -> string RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs -RabbitMQ.Client.IConnection.ConnectionBlocked -> System.EventHandler -RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler -RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler -RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler -RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint RabbitMQ.Client.IConnection.FrameMax.get -> uint RabbitMQ.Client.IConnection.Heartbeat.get -> System.TimeSpan RabbitMQ.Client.IConnection.IsOpen.get -> bool RabbitMQ.Client.IConnection.Protocol.get -> RabbitMQ.Client.IProtocol -RabbitMQ.Client.IConnection.QueueNameChangedAfterRecovery -> System.EventHandler -RabbitMQ.Client.IConnection.RecoveringConsumer -> System.EventHandler -RabbitMQ.Client.IConnection.RecoverySucceeded -> System.EventHandler RabbitMQ.Client.IConnection.ServerProperties.get -> System.Collections.Generic.IDictionary RabbitMQ.Client.IConnection.ShutdownReport.get -> System.Collections.Generic.IEnumerable RabbitMQ.Client.IConnectionExtensions @@ -895,3 +886,12 @@ RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, Syst const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.IConnection.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IConnection.ConnectionBlockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IConnection.RecoverySucceededAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IConnection.ConnectionRecoveryErrorAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IConnection.QueueNameChangedAfterRecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IConnection.RecoveringConsumerAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IConnection.ConnectionUnblockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler! diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index e28bec3c7..3ea28ac77 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -143,9 +143,7 @@ public interface IConnection : INetworkConnection, IDisposable /// , then this event will be signalled whenever one /// of those event handlers throws an exception, as well. /// - event EventHandler CallbackException; - - event EventHandler ConnectionBlocked; + event AsyncEventHandler CallbackExceptionAsync; /// /// Raised when the connection is destroyed. @@ -155,7 +153,7 @@ public interface IConnection : INetworkConnection, IDisposable /// event handler is added to this event, the event handler /// will be fired immediately. /// - event EventHandler ConnectionShutdown; + event AsyncEventHandler ConnectionShutdownAsync; /// /// Raised when the connection completes recovery. @@ -163,7 +161,7 @@ public interface IConnection : INetworkConnection, IDisposable /// /// This event will never fire for connections that disable automatic recovery. /// - event EventHandler RecoverySucceeded; + event AsyncEventHandler RecoverySucceededAsync; /// /// Raised when the connection recovery fails, e.g. because reconnection or topology @@ -172,7 +170,7 @@ public interface IConnection : INetworkConnection, IDisposable /// /// This event will never fire for connections that disable automatic recovery. /// - event EventHandler ConnectionRecoveryError; + event AsyncEventHandler ConnectionRecoveryErrorAsync; /// /// Raised when the server-generated tag of a consumer registered on this connection changes during @@ -182,7 +180,7 @@ public interface IConnection : INetworkConnection, IDisposable /// /// This event will never fire for connections that disable automatic recovery. /// - event EventHandler ConsumerTagChangeAfterRecovery; + event AsyncEventHandler ConsumerTagChangeAfterRecoveryAsync; /// /// Raised when the name of a server-named queue declared on this connection changes during @@ -192,7 +190,7 @@ public interface IConnection : INetworkConnection, IDisposable /// /// This event will never fire for connections that disable automatic recovery. /// - event EventHandler QueueNameChangedAfterRecovery; + event AsyncEventHandler QueueNameChangedAfterRecoveryAsync; /// /// Raised when a consumer is about to be recovered. This event raises when topology recovery @@ -204,9 +202,17 @@ public interface IConnection : INetworkConnection, IDisposable /// /// This event will never fire for connections that disable automatic recovery. /// - public event EventHandler RecoveringConsumer; + public event AsyncEventHandler RecoveringConsumerAsync; + + /// + /// Raised when a connection is blocked by the AMQP broker. + /// + event AsyncEventHandler ConnectionBlockedAsync; - event EventHandler ConnectionUnblocked; + /// + /// Raised when a connection is unblocked by the AMQP broker. + /// + event AsyncEventHandler ConnectionUnblockedAsync; /// /// This method updates the secret used to authenticate this connection. diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 4642f5376..68b5dc6fc 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -117,8 +117,8 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.ConnectionBlocked: { - HandleConnectionBlocked(cmd); - return Task.FromResult(true); + // Note: always returns true + return HandleConnectionBlockedAsync(cmd, cancellationToken); } case ProtocolCommandId.ConnectionClose: { @@ -128,7 +128,7 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella case ProtocolCommandId.ConnectionSecure: { // Note: always returns true - return HandleConnectionSecureAsync(cmd); + return HandleConnectionSecureAsync(cmd, cancellationToken); } case ProtocolCommandId.ConnectionStart: { @@ -138,12 +138,12 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella case ProtocolCommandId.ConnectionTune: { // Note: always returns true - return HandleConnectionTuneAsync(cmd); + return HandleConnectionTuneAsync(cmd, cancellationToken); } case ProtocolCommandId.ConnectionUnblocked: { - HandleConnectionUnblocked(); - return Task.FromResult(true); + // Note: always returns true + return HandleConnectionUnblockedAsync(cancellationToken); } default: { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index 5d504ebf4..0ffd5773d 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection private Task? _recoveryTask; private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource(); - private void HandleConnectionShutdown(object? _, ShutdownEventArgs args) + private Task HandleConnectionShutdownAsync(object? _, ShutdownEventArgs args) { if (ShouldTriggerConnectionRecovery(args)) { @@ -57,6 +57,8 @@ private void HandleConnectionShutdown(object? _, ShutdownEventArgs args) } } + return Task.CompletedTask; + static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) { if (args.Initiator == ShutdownInitiator.Peer) @@ -204,7 +206,8 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, c ESLog.Info("Connection recovery completed"); ThrowIfDisposed(); - _recoverySucceededWrapper.Invoke(this, EventArgs.Empty); + await _recoverySucceededAsyncWrapper.InvokeAsync(this, EventArgs.Empty) + .ConfigureAwait(false); return true; } @@ -266,10 +269,11 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken) { ESLog.Error("Connection recovery exception.", e); // Trigger recovery error events - if (!_connectionRecoveryErrorWrapper.IsEmpty) + if (!_connectionRecoveryErrorAsyncWrapper.IsEmpty) { // Note: recordedEntities semaphore is _NOT_ held at this point - _connectionRecoveryErrorWrapper.Invoke(this, new ConnectionRecoveryErrorEventArgs(e)); + await _connectionRecoveryErrorAsyncWrapper.InvokeAsync(this, new ConnectionRecoveryErrorEventArgs(e)) + .ConfigureAwait(false); } maybeNewInnerConnection?.Dispose(); @@ -377,12 +381,13 @@ await RecordQueueAsync(new RecordedQueue(newName, recordedQueue), recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, cancellationToken) .ConfigureAwait(false); - if (!_queueNameChangedAfterRecoveryWrapper.IsEmpty) + if (!_queueNameChangedAfterRecoveryAsyncWrapper.IsEmpty) { try { _recordedEntitiesSemaphore.Release(); - _queueNameChangedAfterRecoveryWrapper.Invoke(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName)); + await _queueNameChangedAfterRecoveryAsyncWrapper.InvokeAsync(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName)) + .ConfigureAwait(false); } finally { @@ -515,7 +520,8 @@ internal async ValueTask RecoverConsumersAsync(AutorecoveringChannel channelToRe try { _recordedEntitiesSemaphore.Release(); - _consumerAboutToBeRecovered.Invoke(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments)); + await _recoveringConsumerAsyncWrapper.InvokeAsync(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments)) + .ConfigureAwait(false); } finally { @@ -531,12 +537,13 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) RecordedConsumer consumerWithNewConsumerTag = RecordedConsumer.WithNewConsumerTag(newTag, consumer); UpdateConsumer(oldTag, newTag, consumerWithNewConsumerTag); - if (!_consumerTagChangeAfterRecoveryWrapper.IsEmpty) + if (!_consumerTagChangeAfterRecoveryAsyncWrapper.IsEmpty) { try { _recordedEntitiesSemaphore.Release(); - _consumerTagChangeAfterRecoveryWrapper.Invoke(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag)); + await _consumerTagChangeAfterRecoveryAsyncWrapper.InvokeAsync(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag)) + .ConfigureAwait(false); } finally { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index e3918e4c5..c7315eaf4 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -66,14 +66,25 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end _endpoints = endpoints; _innerConnection = innerConnection; - ConnectionShutdown += HandleConnectionShutdown; - _recoverySucceededWrapper = new EventingWrapper("OnConnectionRecovery", onException); - _connectionRecoveryErrorWrapper = new EventingWrapper("OnConnectionRecoveryError", onException); - _consumerTagChangeAfterRecoveryWrapper = new EventingWrapper("OnConsumerRecovery", onException); - _queueNameChangedAfterRecoveryWrapper = new EventingWrapper("OnQueueRecovery", onException); - - void onException(Exception exception, string context) => - _innerConnection.OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); + ConnectionShutdownAsync += HandleConnectionShutdownAsync; + + _recoverySucceededAsyncWrapper = + new AsyncEventingWrapper("OnConnectionRecovery", onExceptionAsync); + + _connectionRecoveryErrorAsyncWrapper = + new AsyncEventingWrapper("OnConnectionRecoveryError", onExceptionAsync); + + _consumerTagChangeAfterRecoveryAsyncWrapper = + new AsyncEventingWrapper("OnConsumerRecovery", onExceptionAsync); + + _queueNameChangedAfterRecoveryAsyncWrapper = + new AsyncEventingWrapper("OnQueueRecovery", onExceptionAsync); + + _recoveringConsumerAsyncWrapper = + new AsyncEventingWrapper("OnRecoveringConsumer", onExceptionAsync); + + Task onExceptionAsync(Exception exception, string context) => + _innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context)); } internal static async ValueTask CreateAsync(ConnectionConfig config, IEndpointResolver endpoints, @@ -88,64 +99,64 @@ await innerConnection.OpenAsync(cancellationToken) return connection; } - public event EventHandler RecoverySucceeded + public event AsyncEventHandler RecoverySucceededAsync { - add => _recoverySucceededWrapper.AddHandler(value); - remove => _recoverySucceededWrapper.RemoveHandler(value); + add => _recoverySucceededAsyncWrapper.AddHandler(value); + remove => _recoverySucceededAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _recoverySucceededWrapper; + private AsyncEventingWrapper _recoverySucceededAsyncWrapper; - public event EventHandler ConnectionRecoveryError + public event AsyncEventHandler ConnectionRecoveryErrorAsync { - add => _connectionRecoveryErrorWrapper.AddHandler(value); - remove => _connectionRecoveryErrorWrapper.RemoveHandler(value); + add => _connectionRecoveryErrorAsyncWrapper.AddHandler(value); + remove => _connectionRecoveryErrorAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _connectionRecoveryErrorWrapper; + private AsyncEventingWrapper _connectionRecoveryErrorAsyncWrapper; - public event EventHandler CallbackException + public event AsyncEventHandler CallbackExceptionAsync { - add => InnerConnection.CallbackException += value; - remove => InnerConnection.CallbackException -= value; + add => InnerConnection.CallbackExceptionAsync += value; + remove => InnerConnection.CallbackExceptionAsync -= value; } - public event EventHandler ConnectionBlocked + public event AsyncEventHandler ConnectionBlockedAsync { - add => InnerConnection.ConnectionBlocked += value; - remove => InnerConnection.ConnectionBlocked -= value; + add => InnerConnection.ConnectionBlockedAsync += value; + remove => InnerConnection.ConnectionBlockedAsync -= value; } - public event EventHandler ConnectionShutdown + public event AsyncEventHandler ConnectionShutdownAsync { - add => InnerConnection.ConnectionShutdown += value; - remove => InnerConnection.ConnectionShutdown -= value; + add => InnerConnection.ConnectionShutdownAsync += value; + remove => InnerConnection.ConnectionShutdownAsync -= value; } - public event EventHandler ConnectionUnblocked + public event AsyncEventHandler ConnectionUnblockedAsync { - add => InnerConnection.ConnectionUnblocked += value; - remove => InnerConnection.ConnectionUnblocked -= value; + add => InnerConnection.ConnectionUnblockedAsync += value; + remove => InnerConnection.ConnectionUnblockedAsync -= value; } - public event EventHandler ConsumerTagChangeAfterRecovery + public event AsyncEventHandler ConsumerTagChangeAfterRecoveryAsync { - add => _consumerTagChangeAfterRecoveryWrapper.AddHandler(value); - remove => _consumerTagChangeAfterRecoveryWrapper.RemoveHandler(value); + add => _consumerTagChangeAfterRecoveryAsyncWrapper.AddHandler(value); + remove => _consumerTagChangeAfterRecoveryAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _consumerTagChangeAfterRecoveryWrapper; + private AsyncEventingWrapper _consumerTagChangeAfterRecoveryAsyncWrapper; - public event EventHandler QueueNameChangedAfterRecovery + public event AsyncEventHandler QueueNameChangedAfterRecoveryAsync { - add => _queueNameChangedAfterRecoveryWrapper.AddHandler(value); - remove => _queueNameChangedAfterRecoveryWrapper.RemoveHandler(value); + add => _queueNameChangedAfterRecoveryAsyncWrapper.AddHandler(value); + remove => _queueNameChangedAfterRecoveryAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _queueNameChangedAfterRecoveryWrapper; + private AsyncEventingWrapper _queueNameChangedAfterRecoveryAsyncWrapper; - public event EventHandler RecoveringConsumer + public event AsyncEventHandler RecoveringConsumerAsync { - add => _consumerAboutToBeRecovered.AddHandler(value); - remove => _consumerAboutToBeRecovered.RemoveHandler(value); + add => _recoveringConsumerAsyncWrapper.AddHandler(value); + remove => _recoveringConsumerAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _consumerAboutToBeRecovered; + private AsyncEventingWrapper _recoveringConsumerAsyncWrapper; public string? ClientProvidedName => _config.ClientProvidedName; diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 225ba9554..fa2daead4 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -701,10 +701,12 @@ await ModelSendAsync(method, cancellationToken). return true; } - protected void HandleConnectionBlocked(IncomingCommand cmd) + protected async Task HandleConnectionBlockedAsync(IncomingCommand cmd, CancellationToken cancellationToken) { string reason = new ConnectionBlocked(cmd.MethodSpan)._reason; - Session.Connection.HandleConnectionBlocked(reason); + await Session.Connection.HandleConnectionBlockedAsync(reason) + .ConfigureAwait(false); + return true; } protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) @@ -713,7 +715,8 @@ protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, Cance var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); try { - Session.Connection.ClosedViaPeer(reason); + await Session.Connection.ClosedViaPeerAsync(reason) + .ConfigureAwait(false); var replyMethod = new ConnectionCloseOk(); await ModelSendAsync(replyMethod, cancellationToken) @@ -735,7 +738,7 @@ await ModelSendAsync(replyMethod, cancellationToken) return true; } - protected async Task HandleConnectionSecureAsync(IncomingCommand _) + protected async Task HandleConnectionSecureAsync(IncomingCommand cmd, CancellationToken cancellationToken) { var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); await k.HandleCommandAsync(new IncomingCommand()) @@ -765,7 +768,7 @@ await Session.Connection.CloseAsync(reason, false, return true; } - protected async Task HandleConnectionTuneAsync(IncomingCommand cmd) + protected async Task HandleConnectionTuneAsync(IncomingCommand cmd, CancellationToken cancellationToken) { // Note: `using` here to ensure instance is disposed using var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); @@ -777,9 +780,11 @@ await k.HandleCommandAsync(cmd) return true; } - protected void HandleConnectionUnblocked() + protected async Task HandleConnectionUnblockedAsync(CancellationToken cancellationToken) { - Session.Connection.HandleConnectionUnblocked(); + await Session.Connection.HandleConnectionUnblockedAsync() + .ConfigureAwait(false); + return true; } public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index 2cdc180df..0a954b89e 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -54,20 +54,22 @@ internal void NotifyReceivedCloseOk() _closed = true; } - internal void HandleConnectionBlocked(string reason) + internal Task HandleConnectionBlockedAsync(string reason) { - if (!_connectionBlockedWrapper.IsEmpty) + if (!_connectionBlockedAsyncWrapper.IsEmpty) { - _connectionBlockedWrapper.Invoke(this, new ConnectionBlockedEventArgs(reason)); + return _connectionBlockedAsyncWrapper.InvokeAsync(this, new ConnectionBlockedEventArgs(reason)); } + return Task.CompletedTask; } - internal void HandleConnectionUnblocked() + internal Task HandleConnectionUnblockedAsync() { - if (!_connectionUnblockedWrapper.IsEmpty) + if (!_connectionUnblockedAsyncWrapper.IsEmpty) { - _connectionUnblockedWrapper.Invoke(this, EventArgs.Empty); + return _connectionUnblockedAsyncWrapper.InvokeAsync(this, EventArgs.Empty); } + return Task.CompletedTask; } private async ValueTask StartAndTuneAsync(CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs index e5588d39c..881d7c2c4 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs @@ -108,7 +108,8 @@ private async void HeartbeatReadTimerCallback(object? state) { var eose = new EndOfStreamException($"Heartbeat missing with heartbeat == {_heartbeat} seconds"); LogCloseError(eose.Message, eose); - HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose)); + await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose)) + .ConfigureAwait(false); shouldTerminate = true; } } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index c3a90f8ec..f9ecbd918 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -62,7 +62,8 @@ await ReceiveLoopAsync(mainLoopToken) Constants.InternalError, "Thread aborted (AppDomain unloaded?)", exception: taex); - HandleMainLoopException(ea); + await HandleMainLoopExceptionAsync(ea) + .ConfigureAwait(false); } #endif catch (EndOfStreamException eose) @@ -72,7 +73,8 @@ await ReceiveLoopAsync(mainLoopToken) 0, "End of stream", exception: eose); - HandleMainLoopException(ea); + await HandleMainLoopExceptionAsync(ea) + .ConfigureAwait(false); } catch (HardProtocolException hpe) { @@ -89,7 +91,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken) Constants.InternalError, fileLoadException.Message, exception: fileLoadException); - HandleMainLoopException(ea); + await HandleMainLoopExceptionAsync(ea) + .ConfigureAwait(false); } catch (OperationCanceledException ocex) { @@ -103,7 +106,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken) Constants.InternalError, ocex.Message, exception: ocex); - HandleMainLoopException(ea); + await HandleMainLoopExceptionAsync(ea) + .ConfigureAwait(false); } } catch (Exception ex) @@ -112,7 +116,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken) Constants.InternalError, ex.Message, exception: ex); - HandleMainLoopException(ea); + await HandleMainLoopExceptionAsync(ea) + .ConfigureAwait(false); } using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionCloseTimeout); @@ -202,7 +207,7 @@ private void MaybeTerminateMainloopAndStopHeartbeatTimers(bool cancelMainLoop = MaybeStopHeartbeatTimers(); } - private void HandleMainLoopException(ShutdownEventArgs reason) + private async Task HandleMainLoopExceptionAsync(ShutdownEventArgs reason) { string message = reason.GetLogMessage(); if (false == SetCloseReason(reason)) @@ -213,7 +218,7 @@ private void HandleMainLoopException(ShutdownEventArgs reason) _channel0.MaybeSetConnectionStartException(reason.Exception!); - OnShutdown(reason); + await OnShutdownAsync(reason).ConfigureAwait(false); LogCloseError($"unexpected connection closure: {message}", reason.Exception!); } @@ -222,7 +227,7 @@ private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe, { if (SetCloseReason(hpe.ShutdownReason)) { - OnShutdown(hpe.ShutdownReason); + await OnShutdownAsync(hpe.ShutdownReason).ConfigureAwait(false); await _session0.SetSessionClosingAsync(false) .ConfigureAwait(false); try diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 8e8065b8d..4c184cf25 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -64,11 +64,18 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _config = config; _frameHandler = frameHandler; - Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); - _callbackExceptionWrapper = new EventingWrapper(string.Empty, (exception, context) => { }); - _connectionBlockedWrapper = new EventingWrapper("OnConnectionBlocked", onException); - _connectionUnblockedWrapper = new EventingWrapper("OnConnectionUnblocked", onException); - _connectionShutdownWrapper = new EventingWrapper("OnShutdown", onException); + _callbackExceptionAsyncWrapper = + new AsyncEventingWrapper(string.Empty, + (exception, context) => Task.CompletedTask); + + _connectionBlockedAsyncWrapper = + new AsyncEventingWrapper("OnConnectionBlocked", onExceptionAsync); + + _connectionUnblockedAsyncWrapper = + new AsyncEventingWrapper("OnConnectionUnblocked", onExceptionAsync); + + _connectionShutdownAsyncWrapper = + new AsyncEventingWrapper("OnShutdown", onExceptionAsync); _sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize); _session0 = new MainSession(this, config.MaxInboundMessageBodySize); @@ -81,6 +88,9 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) }; _mainLoopTask = Task.CompletedTask; + + Task onExceptionAsync(Exception exception, string context) => + OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context)); } public Guid Id => _id; @@ -118,35 +128,35 @@ internal IFrameHandler FrameHandler get { return _frameHandler; } } - public event EventHandler CallbackException + public event AsyncEventHandler CallbackExceptionAsync { - add => _callbackExceptionWrapper.AddHandler(value); - remove => _callbackExceptionWrapper.RemoveHandler(value); + add => _callbackExceptionAsyncWrapper.AddHandler(value); + remove => _callbackExceptionAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _callbackExceptionWrapper; + private AsyncEventingWrapper _callbackExceptionAsyncWrapper; - public event EventHandler ConnectionBlocked + public event AsyncEventHandler ConnectionBlockedAsync { - add => _connectionBlockedWrapper.AddHandler(value); - remove => _connectionBlockedWrapper.RemoveHandler(value); + add => _connectionBlockedAsyncWrapper.AddHandler(value); + remove => _connectionBlockedAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _connectionBlockedWrapper; + private AsyncEventingWrapper _connectionBlockedAsyncWrapper; - public event EventHandler ConnectionUnblocked + public event AsyncEventHandler ConnectionUnblockedAsync { - add => _connectionUnblockedWrapper.AddHandler(value); - remove => _connectionUnblockedWrapper.RemoveHandler(value); + add => _connectionUnblockedAsyncWrapper.AddHandler(value); + remove => _connectionUnblockedAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _connectionUnblockedWrapper; + private AsyncEventingWrapper _connectionUnblockedAsyncWrapper; - public event EventHandler RecoveringConsumer + public event AsyncEventHandler RecoveringConsumerAsync { - add => _consumerAboutToBeRecovered.AddHandler(value); - remove => _consumerAboutToBeRecovered.RemoveHandler(value); + add => _consumerAboutToBeRecoveredAsyncWrapper.AddHandler(value); + remove => _consumerAboutToBeRecoveredAsyncWrapper.RemoveHandler(value); } - private EventingWrapper _consumerAboutToBeRecovered; + private AsyncEventingWrapper _consumerAboutToBeRecoveredAsyncWrapper; - public event EventHandler ConnectionShutdown + public event AsyncEventHandler ConnectionShutdownAsync { add { @@ -154,7 +164,7 @@ public event EventHandler ConnectionShutdown ShutdownEventArgs? reason = CloseReason; if (reason is null) { - _connectionShutdownWrapper.AddHandler(value); + _connectionShutdownAsyncWrapper.AddHandler(value); } else { @@ -164,15 +174,15 @@ public event EventHandler ConnectionShutdown remove { ThrowIfDisposed(); - _connectionShutdownWrapper.RemoveHandler(value); + _connectionShutdownAsyncWrapper.RemoveHandler(value); } } - private EventingWrapper _connectionShutdownWrapper; + private AsyncEventingWrapper _connectionShutdownAsyncWrapper; /// /// This event is never fired by non-recovering connections but it is a part of the interface. /// - public event EventHandler RecoverySucceeded + public event AsyncEventHandler RecoverySucceededAsync { add { } remove { } @@ -181,7 +191,7 @@ public event EventHandler RecoverySucceeded /// /// This event is never fired by non-recovering connections but it is a part of the interface. /// - public event EventHandler ConnectionRecoveryError + public event AsyncEventHandler ConnectionRecoveryErrorAsync { add { } remove { } @@ -190,7 +200,7 @@ public event EventHandler ConnectionRecoveryEr /// /// This event is never fired by non-recovering connections but it is a part of the interface. /// - public event EventHandler ConsumerTagChangeAfterRecovery + public event AsyncEventHandler ConsumerTagChangeAfterRecoveryAsync { add { } remove { } @@ -199,7 +209,7 @@ public event EventHandler ConsumerTagC /// /// This event is never fired by non-recovering connections but it is a part of the interface. /// - public event EventHandler QueueNameChangedAfterRecovery + public event AsyncEventHandler QueueNameChangedAfterRecoveryAsync { add { } remove { } @@ -207,10 +217,11 @@ public event EventHandler QueueNameChang internal void TakeOver(Connection other) { - _callbackExceptionWrapper.Takeover(other._callbackExceptionWrapper); - _connectionBlockedWrapper.Takeover(other._connectionBlockedWrapper); - _connectionUnblockedWrapper.Takeover(other._connectionUnblockedWrapper); - _connectionShutdownWrapper.Takeover(other._connectionShutdownWrapper); + _callbackExceptionAsyncWrapper.Takeover(other._callbackExceptionAsyncWrapper); + _connectionBlockedAsyncWrapper.Takeover(other._connectionBlockedAsyncWrapper); + _connectionUnblockedAsyncWrapper.Takeover(other._connectionUnblockedAsyncWrapper); + _connectionShutdownAsyncWrapper.Takeover(other._connectionShutdownAsyncWrapper); + _consumerAboutToBeRecoveredAsyncWrapper.Takeover(other._consumerAboutToBeRecoveredAsyncWrapper); } internal async ValueTask OpenAsync(CancellationToken cancellationToken) @@ -319,7 +330,8 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti { cancellationToken.ThrowIfCancellationRequested(); - OnShutdown(reason); + await OnShutdownAsync(reason) + .ConfigureAwait(false); await _session0.SetSessionClosingAsync(false) .ConfigureAwait(false); @@ -399,7 +411,7 @@ await _frameHandler.CloseAsync(cancellationToken) } } - internal void ClosedViaPeer(ShutdownEventArgs reason) + internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason) { if (false == SetCloseReason(reason)) { @@ -410,8 +422,10 @@ internal void ClosedViaPeer(ShutdownEventArgs reason) // We are quiescing, but still allow for server-close } - OnShutdown(reason); - _session0.SetSessionClosing(true); + await OnShutdownAsync(reason) + .ConfigureAwait(false); + await _session0.SetSessionClosingAsync(true) + .ConfigureAwait(false); MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true); } @@ -429,10 +443,10 @@ private async Task FinishCloseAsync(CancellationToken cancellationToken) } ///Broadcasts notification of the final shutdown of the connection. - private void OnShutdown(ShutdownEventArgs reason) + private Task OnShutdownAsync(ShutdownEventArgs reason) { ThrowIfDisposed(); - _connectionShutdownWrapper.Invoke(this, reason); + return _connectionShutdownAsyncWrapper.InvokeAsync(this, reason); } private bool SetCloseReason(ShutdownEventArgs reason) @@ -458,9 +472,9 @@ private void LogCloseError(string error, Exception ex) } } - internal void OnCallbackException(CallbackExceptionEventArgs args) + internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) { - _callbackExceptionWrapper.Invoke(this, args); + return _callbackExceptionAsyncWrapper.InvokeAsync(this, args); } internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index b5dea9b14..672b9d527 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -53,7 +53,7 @@ protected SessionBase(Connection connection, ushort channelNumber) ChannelNumber = channelNumber; if (channelNumber != 0) { - connection.ConnectionShutdown += OnConnectionShutdown; + connection.ConnectionShutdownAsync += OnConnectionShutdownAsync; } RabbitMqClientEventSource.Log.ChannelOpened(); } @@ -86,14 +86,15 @@ public event EventHandler SessionShutdown [MemberNotNullWhen(false, nameof(CloseReason))] public bool IsOpen => CloseReason is null; - public virtual void OnConnectionShutdown(object? conn, ShutdownEventArgs reason) + public Task OnConnectionShutdownAsync(object? conn, ShutdownEventArgs reason) { Close(reason); + return Task.CompletedTask; } - public virtual void OnSessionShutdown(ShutdownEventArgs reason) + public void OnSessionShutdown(ShutdownEventArgs reason) { - Connection.ConnectionShutdown -= OnConnectionShutdown; + Connection.ConnectionShutdownAsync -= OnConnectionShutdownAsync; _sessionShutdownWrapper.Invoke(this, reason); } diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index b0785c0fa..980212d34 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -68,7 +68,7 @@ static Program() static async Task Main() { using IConnection consumeConnection = await s_consumeConnectionFactory.CreateConnectionAsync(); - consumeConnection.ConnectionShutdown += Connection_ConnectionShutdown; + consumeConnection.ConnectionShutdownAsync += ConnectionShutdownAsync; using IChannel consumeChannel = await consumeConnection.CreateChannelAsync(); consumeChannel.ChannelShutdown += Channel_ChannelShutdown; @@ -92,7 +92,7 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer for (int i = 0; i < ConnectionCount; i++) { IConnection publishConnection = await s_publishConnectionFactory.CreateConnectionAsync($"{AppId}-PUBLISH-{i}"); - publishConnection.ConnectionShutdown += Connection_ConnectionShutdown; + publishConnection.ConnectionShutdownAsync += ConnectionShutdownAsync; publishConnections.Add(publishConnection); } @@ -163,13 +163,14 @@ private static void PublishChannel_BasicNacks(object sender, BasicNackEventArgs Console.Error.WriteLine("[ERROR] unexpected nack on publish: {0}", e); } - private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) + private static Task ConnectionShutdownAsync(object sender, ShutdownEventArgs e) { if (e.Initiator != ShutdownInitiator.Application) { Console.Error.WriteLine("[ERROR] unexpected connection shutdown: {0}", e); s_consumeDoneEvent.TrySetResult(false); } + return Task.CompletedTask; } private static void Channel_ChannelShutdown(object sender, ShutdownEventArgs e) diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index ea447bdb5..6bb1ce044 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -225,7 +225,7 @@ protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel) { if (conn != null) { - conn.ConnectionRecoveryError += (s, ea) => + conn.ConnectionRecoveryErrorAsync += (s, ea) => { _connectionRecoveryException = ea.Exception; @@ -237,9 +237,10 @@ protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel) catch (InvalidOperationException) { } + return Task.CompletedTask; }; - conn.CallbackException += (o, ea) => + conn.CallbackExceptionAsync += (o, ea) => { _connectionCallbackException = ea.Exception; @@ -251,6 +252,7 @@ protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel) catch (InvalidOperationException) { } + return Task.CompletedTask; }; } @@ -276,7 +278,7 @@ protected void AddCallbackShutdownHandlers() { if (_conn != null) { - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { @@ -289,6 +291,7 @@ protected void AddCallbackShutdownHandlers() { } }); + return Task.CompletedTask; }; } @@ -528,13 +531,14 @@ protected ConnectionFactory CreateConnectionFactory( }; } - protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args) + protected Task HandleConnectionShutdownAsync(object sender, ShutdownEventArgs args) { if (args.Initiator != ShutdownInitiator.Application) { IConnection conn = (IConnection)sender; _output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}"); } + return Task.CompletedTask; } protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args, Action a) @@ -625,7 +629,11 @@ protected static TaskCompletionSource PrepareForRecovery(IConnection conn) var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); AutorecoveringConnection aconn = conn as AutorecoveringConnection; - aconn.RecoverySucceeded += (source, ea) => tcs.TrySetResult(true); + aconn.RecoverySucceededAsync += (source, ea) => + { + tcs.TrySetResult(true); + return Task.CompletedTask; + }; return tcs; } diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index de581f0c4..150e919e3 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -229,7 +229,11 @@ protected static TaskCompletionSource PrepareForShutdown(IConnection conn) var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); AutorecoveringConnection aconn = conn as AutorecoveringConnection; - aconn.ConnectionShutdown += (c, args) => tcs.TrySetResult(true); + aconn.ConnectionShutdownAsync += (c, args) => + { + tcs.TrySetResult(true); + return Task.CompletedTask; + }; return tcs; } diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs index 0ab41bc12..214637043 100644 --- a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs @@ -56,7 +56,11 @@ public async Task TestRecoveringConsumerEventHandlers_Called(int iterations) await _channel.BasicConsumeAsync(q, true, cons); int counter = 0; - ((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) => Interlocked.Increment(ref counter); + ((AutorecoveringConnection)_conn).RecoveringConsumerAsync += (sender, args) => + { + Interlocked.Increment(ref counter); + return Task.CompletedTask; + }; for (int i = 0; i < iterations; i++) { @@ -84,13 +88,14 @@ public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown bool ctagMatches = false; bool consumerArgumentMatches = false; - ((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) => + ((AutorecoveringConnection)_conn).RecoveringConsumerAsync += (sender, args) => { // We cannot assert here because NUnit throws when an assertion fails. This exception is caught and // passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick // and assert in the test function. ctagMatches = args.ConsumerTag == expectedCTag; consumerArgumentMatches = (string)args.ConsumerArguments[key] == value; + return Task.CompletedTask; }; await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoverySucceededEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoverySucceededEventHandlers.cs index ef7ba04ce..c6d64fd7c 100644 --- a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoverySucceededEventHandlers.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoverySucceededEventHandlers.cs @@ -47,7 +47,11 @@ public TestRecoverySucceededEventHandlers(ITestOutputHelper output) : base(outpu public async Task TestRecoverySucceededEventHandlers_Called() { int counter = 0; - ((AutorecoveringConnection)_conn).RecoverySucceeded += (source, ea) => Interlocked.Increment(ref counter); + ((AutorecoveringConnection)_conn).RecoverySucceededAsync += (source, ea) => + { + Interlocked.Increment(ref counter); + return Task.CompletedTask; + }; await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestShutdownEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestShutdownEventHandlers.cs index 03d5ad34b..b4b509a45 100644 --- a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestShutdownEventHandlers.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestShutdownEventHandlers.cs @@ -46,7 +46,11 @@ public TestShutdownEventHandlers(ITestOutputHelper output) : base(output) public async Task TestShutdownEventHandlers_Called() { int counter = 0; - _conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); + _conn.ConnectionShutdownAsync += (c, args) => + { + Interlocked.Increment(ref counter); + return Task.CompletedTask; + }; Assert.True(_conn.IsOpen); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/ConnectionRecovery/TestConsumerRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConsumerRecovery.cs index da4c46556..4f627cfc8 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestConsumerRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestConsumerRecovery.cs @@ -58,7 +58,11 @@ public async Task TestConsumerRecoveryWithManyConsumers() } var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - ((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecovery += (prev, current) => tcs.TrySetResult(true); + ((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecoveryAsync += (prev, current) => + { + tcs.TrySetResult(true); + return Task.CompletedTask; + }; await CloseAndWaitForRecoveryAsync(); await WaitAsync(tcs, "consumer tag change after recovery"); diff --git a/projects/Test/Integration/ConnectionRecovery/TestQueueRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestQueueRecovery.cs index a75961163..dd21340be 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestQueueRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestQueueRecovery.cs @@ -76,8 +76,16 @@ public async Task TestServerNamedQueueRecovery() var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var connection = (AutorecoveringConnection)_conn; - connection.RecoverySucceeded += (source, ea) => tcs.SetResult(true); - connection.QueueNameChangedAfterRecovery += (source, ea) => { nameAfter = ea.NameAfter; }; + connection.RecoverySucceededAsync += (source, ea) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; + connection.QueueNameChangedAfterRecoveryAsync += (source, ea) => + { + nameAfter = ea.NameAfter; + return Task.CompletedTask; + }; await CloseAndWaitForRecoveryAsync(); await WaitAsync(tcs, "recovery succeeded"); diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 2edda15c7..8e7b59405 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -87,12 +87,13 @@ public async Task TestBasicRoundtripConcurrent() bool body2Received = false; try { - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); + return Task.CompletedTask; }; _channel.ChannelShutdown += (o, ea) => @@ -179,12 +180,13 @@ public async Task TestBasicRoundtripConcurrentManyMessages() try { - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); + return Task.CompletedTask; }; _channel.ChannelShutdown += (o, ea) => @@ -202,12 +204,13 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { using (IConnection publishConn = await _connFactory.CreateConnectionAsync()) { - publishConn.ConnectionShutdown += (o, ea) => + publishConn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(publishConn, ea, (args) => { MaybeSetException(args, publish1SyncSource, publish2SyncSource); }); + return Task.CompletedTask; }; using (IChannel publishChannel = await publishConn.CreateChannelAsync()) { @@ -245,12 +248,13 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { using (IConnection consumeConn = await _connFactory.CreateConnectionAsync()) { - consumeConn.ConnectionShutdown += (o, ea) => + consumeConn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(consumeConn, ea, (args) => { MaybeSetException(ea, publish1SyncSource, publish2SyncSource); }); + return Task.CompletedTask; }; using (IChannel consumeChannel = await consumeConn.CreateChannelAsync()) { @@ -343,12 +347,13 @@ public async Task TestBasicRejectAsync() try { - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { MaybeSetException(args, publishSyncSource); }); + return Task.CompletedTask; }; _channel.ChannelShutdown += (o, ea) => @@ -439,12 +444,13 @@ public async Task TestBasicAckAsync() var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { MaybeSetException(args, publishSyncSource); }); + return Task.CompletedTask; }; _channel.ChannelShutdown += (o, ea) => @@ -503,12 +509,13 @@ public async Task TestBasicNackAsync() var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { MaybeSetException(ea, publishSyncSource); }); + return Task.CompletedTask; }; _channel.ChannelShutdown += (o, ea) => @@ -609,12 +616,13 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() tcs.SetCanceled(); }); - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { MaybeSetException(ea, tcs); }); + return Task.CompletedTask; }; _channel.ChannelShutdown += (o, ea) => diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index c4266bc3d..648dcbf45 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -202,9 +202,10 @@ public async Task TestMaxInboundMessageBodySize() using (IConnection conn = await cf.CreateConnectionAsync()) { - conn.ConnectionShutdown += (o, a) => + conn.ConnectionShutdownAsync += (o, a) => { sawConnectionShutdown = true; + return Task.CompletedTask; }; Assert.Equal(maxMsgSize, cf.MaxInboundMessageBodySize); diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index 0d6492360..0ab553229 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -50,7 +50,7 @@ public override async Task InitializeAsync() { _connFactory = CreateConnectionFactory(); _conn = await _connFactory.CreateConnectionAsync(); - _conn.ConnectionShutdown += HandleConnectionShutdown; + _conn.ConnectionShutdownAsync += HandleConnectionShutdownAsync; // NB: not creating _channel because this test suite doesn't use it. Assert.Null(_channel); } diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs index c13d66ee4..33afd2288 100644 --- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs @@ -167,7 +167,11 @@ public async Task TestConsumerRecoveryOnClientNamedQueueWithOneRecovery() await AssertConsumerCountAsync(ch, q1, 1); bool queueNameChangeAfterRecoveryCalled = false; - c.QueueNameChangedAfterRecovery += (source, ea) => { queueNameChangeAfterRecoveryCalled = true; }; + c.QueueNameChangedAfterRecoveryAsync += (source, ea) => + { + queueNameChangeAfterRecoveryCalled = true; + return Task.CompletedTask; + }; // connection #2 await CloseAndWaitForRecoveryAsync(c); @@ -222,11 +226,12 @@ public async Task TestConsumerRecoveryWithServerNamedQueue() bool queueNameBeforeIsEqual = false; bool queueNameChangeAfterRecoveryCalled = false; string qnameAfterRecovery = null; - c.QueueNameChangedAfterRecovery += (source, ea) => + c.QueueNameChangedAfterRecoveryAsync += (source, ea) => { queueNameChangeAfterRecoveryCalled = true; queueNameBeforeIsEqual = qname.Equals(ea.NameBefore); qnameAfterRecovery = ea.NameAfter; + return Task.CompletedTask; }; await CloseAndWaitForRecoveryAsync(c); @@ -285,9 +290,21 @@ public async Task TestTopologyRecoveryConsumerFilter() using (AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter)) { - conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true); - conn.ConnectionRecoveryError += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception); - conn.CallbackException += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception); + conn.RecoverySucceededAsync += (source, ea) => + { + connectionRecoveryTcs.SetResult(true); + return Task.CompletedTask; + }; + conn.ConnectionRecoveryErrorAsync += (source, ea) => + { + connectionRecoveryTcs.SetException(ea.Exception); + return Task.CompletedTask; + }; + conn.CallbackExceptionAsync += (source, ea) => + { + connectionRecoveryTcs.SetException(ea.Exception); + return Task.CompletedTask; + }; using (IChannel ch = await conn.CreateChannelAsync()) { diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index 791e52e2d..a49935911 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -96,7 +96,11 @@ public async Task TestTopologyRecoveryQueueFilter() }; AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter); - conn.RecoverySucceeded += (source, ea) => tcs.SetResult(true); + conn.RecoverySucceededAsync += (source, ea) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; IChannel ch = await conn.CreateChannelAsync(); string queueToRecover = "recovered.queue"; @@ -142,7 +146,11 @@ public async Task TestTopologyRecoveryExchangeFilter() }; AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter); - conn.RecoverySucceeded += (source, ea) => tcs.SetResult(true); + conn.RecoverySucceededAsync += (source, ea) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; IChannel ch = await conn.CreateChannelAsync(); try { @@ -186,7 +194,11 @@ public async Task TestTopologyRecoveryBindingFilter() }; AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter); - conn.RecoverySucceeded += (source, ea) => tcs.SetResult(true); + conn.RecoverySucceededAsync += (source, ea) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; IChannel ch = await conn.CreateChannelAsync(); @@ -237,9 +249,21 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter(); AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter); - conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true); - conn.ConnectionRecoveryError += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception); - conn.CallbackException += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception); + conn.RecoverySucceededAsync += (source, ea) => + { + connectionRecoveryTcs.SetResult(true); + return Task.CompletedTask; + }; + conn.ConnectionRecoveryErrorAsync += (source, ea) => + { + connectionRecoveryTcs.SetException(ea.Exception); + return Task.CompletedTask; + }; + conn.CallbackExceptionAsync += (source, ea) => + { + connectionRecoveryTcs.SetException(ea.Exception); + return Task.CompletedTask; + }; IChannel ch = await conn.CreateChannelAsync(); try @@ -332,7 +356,11 @@ await channel.QueueDeclareAsync(rq.Name, false, false, false, }; AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandlerAsync(exceptionHandler); - conn.RecoverySucceeded += (source, ea) => tcs.SetResult(true); + conn.RecoverySucceededAsync += (source, ea) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; IChannel ch = await conn.CreateChannelAsync(); string queueToRecoverWithException = "recovery.exception.queue"; @@ -388,7 +416,11 @@ public async Task TestTopologyRecoveryExchangeExceptionHandler() }; AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandlerAsync(exceptionHandler); - conn.RecoverySucceeded += (source, ea) => tcs.SetResult(true); + conn.RecoverySucceededAsync += (source, ea) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; string exchangeToRecoverWithException = "recovery.exception.exchange"; string exchangeToRecoverSuccessfully = "successfully.recovered.exchange"; @@ -449,7 +481,11 @@ public async Task TestTopologyRecoveryBindingExceptionHandler() }; AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandlerAsync(exceptionHandler); - conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true); + conn.RecoverySucceededAsync += (source, ea) => + { + connectionRecoveryTcs.SetResult(true); + return Task.CompletedTask; + }; IChannel ch = await conn.CreateChannelAsync(); const string queueWithRecoveredBinding = "successfully.recovered.queue"; @@ -511,7 +547,11 @@ public async Task TestTopologyRecoveryConsumerExceptionHandler() }; AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandlerAsync(exceptionHandler); - conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true); + conn.RecoverySucceededAsync += (source, ea) => + { + connectionRecoveryTcs.SetResult(true); + return Task.CompletedTask; + }; IChannel ch = await conn.CreateChannelAsync(); try { diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index d9c1698bf..3b4521ef3 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -66,7 +66,7 @@ public async Task TestUnthrottledFloodPublishing() Assert.IsNotType(_conn); _channel = await _conn.CreateChannelAsync(); - _conn.ConnectionShutdown += (_, ea) => + _conn.ConnectionShutdownAsync += (_, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { @@ -75,6 +75,7 @@ public async Task TestUnthrottledFloodPublishing() sawUnexpectedShutdown = true; } }); + return Task.CompletedTask; }; _channel.ChannelShutdown += (o, ea) => @@ -131,7 +132,7 @@ public async Task TestMultithreadFloodPublishing() var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { @@ -141,6 +142,7 @@ public async Task TestMultithreadFloodPublishing() allMessagesSeenTcs.TrySetException(args.Exception); } }); + return Task.CompletedTask; }; _channel.ChannelShutdown += (o, ea) => @@ -164,7 +166,7 @@ public async Task TestMultithreadFloodPublishing() bool stop = false; using (IConnection publishConnection = await _connFactory.CreateConnectionAsync()) { - publishConnection.ConnectionShutdown += (o, ea) => + publishConnection.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { @@ -174,6 +176,7 @@ public async Task TestMultithreadFloodPublishing() allMessagesSeenTcs.TrySetException(args.Exception); } }); + return Task.CompletedTask; }; using (IChannel publishChannel = await publishConnection.CreateChannelAsync()) @@ -215,7 +218,7 @@ public async Task TestMultithreadFloodPublishing() { using (IConnection consumeConnection = await _connFactory.CreateConnectionAsync()) { - consumeConnection.ConnectionShutdown += (o, ea) => + consumeConnection.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { @@ -225,6 +228,7 @@ public async Task TestMultithreadFloodPublishing() allMessagesSeenTcs.TrySetException(args.Exception); } }); + return Task.CompletedTask; }; using (IChannel consumeChannel = await consumeConnection.CreateChannelAsync()) diff --git a/projects/Test/Integration/TestPublishSharedChannelAsync.cs b/projects/Test/Integration/TestPublishSharedChannelAsync.cs index 7c7d094ca..3c508d087 100644 --- a/projects/Test/Integration/TestPublishSharedChannelAsync.cs +++ b/projects/Test/Integration/TestPublishSharedChannelAsync.cs @@ -78,7 +78,7 @@ public async Task MultiThreadPublishOnSharedChannel() try { Assert.IsNotType(conn); - conn.ConnectionShutdown += HandleConnectionShutdown; + conn.ConnectionShutdownAsync += HandleConnectionShutdownAsync; using (IChannel channel = await conn.CreateChannelAsync()) { diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index 8ba26841b..fc6315983 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -62,7 +62,7 @@ public async Task TestConcurrentQueueDeclareAndBindAsync() { bool sawShutdown = false; - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { HandleConnectionShutdown(_conn, ea, (args) => { @@ -71,6 +71,7 @@ public async Task TestConcurrentQueueDeclareAndBindAsync() sawShutdown = true; } }); + return Task.CompletedTask; }; _channel.ChannelShutdown += (o, ea) => diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index b6f255757..52656c252 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -85,19 +85,21 @@ public async Task TestCloseConnection() { using (IConnection conn = await cf.CreateConnectionAsync()) { - conn.CallbackException += (s, ea) => + conn.CallbackExceptionAsync += (s, ea) => { _output.WriteLine($"[ERROR] unexpected callback exception {ea.Detail} {ea.Exception}"); recoverySucceededTcs.SetResult(false); + return Task.CompletedTask; }; - conn.ConnectionRecoveryError += (s, ea) => + conn.ConnectionRecoveryErrorAsync += (s, ea) => { _output.WriteLine($"[ERROR] connection recovery error {ea.Exception}"); recoverySucceededTcs.SetResult(false); + return Task.CompletedTask; }; - conn.ConnectionShutdown += (s, ea) => + conn.ConnectionShutdownAsync += (s, ea) => { if (IsVerbose) { @@ -109,9 +111,10 @@ public async Task TestCloseConnection() * test exits, and connectionShutdownTcs will have already been set */ connectionShutdownTcs.TrySetResult(true); + return Task.CompletedTask; }; - conn.RecoverySucceeded += (s, ea) => + conn.RecoverySucceededAsync += (s, ea) => { if (IsVerbose) { @@ -119,6 +122,7 @@ public async Task TestCloseConnection() } recoverySucceededTcs.SetResult(true); + return Task.CompletedTask; }; async Task PublishLoop() @@ -265,9 +269,10 @@ public async Task TestTcpReset_GH1464() { using (IConnection conn = await cf.CreateConnectionAsync()) { - conn.ConnectionShutdown += (o, ea) => + conn.ConnectionShutdownAsync += (o, ea) => { connectionShutdownTcs.SetResult(true); + return Task.CompletedTask; }; using (IChannel ch = await conn.CreateChannelAsync()) diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index 9a25b687d..5b061ab29 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -83,20 +83,23 @@ public async Task InitializeAsync() _connection = await _connectionFactory.CreateConnectionAsync(_cancellationTokenSource.Token); - _connection.ConnectionShutdown += (sender, ea) => + _connection.ConnectionShutdownAsync += (sender, ea) => { _testOutputHelper.WriteLine("{0} [WARNING] connection shutdown!", DateTime.Now); + return Task.CompletedTask; }; - _connection.ConnectionRecoveryError += (sender, ea) => + _connection.ConnectionRecoveryErrorAsync += (sender, ea) => { _testOutputHelper.WriteLine("{0} [ERROR] connection recovery error: {1}", DateTime.Now, ea.Exception); + return Task.CompletedTask; }; - _connection.RecoverySucceeded += (sender, ea) => + _connection.RecoverySucceededAsync += (sender, ea) => { _testOutputHelper.WriteLine("{0} [INFO] connection recovery succeeded", DateTime.Now); + return Task.CompletedTask; }; _credentialsRefresher = new CredentialsRefresher(_producerCredentialsProvider, diff --git a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs index 591b39ba9..c610da454 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs @@ -60,14 +60,16 @@ public override async Task DisposeAsync() public async Task TestConnectionBlockedNotification() { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionBlocked += (object sender, ConnectionBlockedEventArgs args) => + _conn.ConnectionBlockedAsync += async (object sender, ConnectionBlockedEventArgs args) => { - UnblockAsync(); + // TODO should this continue to be doing fire and forget? + await UnblockAsync(); }; - _conn.ConnectionUnblocked += (object sender, EventArgs ea) => + _conn.ConnectionUnblockedAsync += (object sender, EventArgs ea) => { tcs.SetResult(true); + return Task.CompletedTask; }; await BlockAsync(_channel); diff --git a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs index e1f50e1a0..daeab8ab3 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs @@ -82,14 +82,16 @@ public async Task TestConnectionBlockedChannelLeak_GH1573() string exchangeName = GenerateExchangeName(); - _conn.ConnectionBlocked += (object sender, ConnectionBlockedEventArgs args) => + _conn.ConnectionBlockedAsync += (object sender, ConnectionBlockedEventArgs args) => { connectionBlockedTcs.SetResult(true); + return Task.CompletedTask; }; - _conn.ConnectionUnblocked += (object sender, EventArgs ea) => + _conn.ConnectionUnblockedAsync += (object sender, EventArgs ea) => { connectionUnblockedTcs.SetResult(true); + return Task.CompletedTask; }; async Task ExchangeDeclareAndPublish() diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs index 2edfa9f46..dc1831a7b 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs @@ -146,7 +146,11 @@ public async Task TestBlockedListenersRecovery() try { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionBlocked += (c, reason) => tcs.SetResult(true); + _conn.ConnectionBlockedAsync += (c, reason) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); await BlockAsync(_channel); @@ -219,11 +223,12 @@ public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() string nameAfter = null; var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - ((AutorecoveringConnection)_conn).QueueNameChangedAfterRecovery += (source, ea) => + ((AutorecoveringConnection)_conn).QueueNameChangedAfterRecoveryAsync += (source, ea) => { nameBefore = ea.NameBefore; nameAfter = ea.NameAfter; tcs.SetResult(true); + return Task.CompletedTask; }; await _channel.QueueBindAsync(queue: nameBefore, exchange: x, routingKey: ""); @@ -248,7 +253,11 @@ public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() public async Task TestShutdownEventHandlersRecoveryOnConnectionAfterDelayedServerRestart() { int counter = 0; - _conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); + _conn.ConnectionShutdownAsync += (c, args) => + { + Interlocked.Increment(ref counter); + return Task.CompletedTask; + }; TaskCompletionSource shutdownLatch = PrepareForShutdown(_conn); TaskCompletionSource recoveryLatch = PrepareForRecovery((AutorecoveringConnection)_conn); @@ -280,13 +289,18 @@ public async Task TestShutdownEventHandlersRecoveryOnConnectionAfterTwoDelayedSe AutorecoveringConnection aconn = (AutorecoveringConnection)_conn; - aconn.ConnectionRecoveryError += (c, args) => + aconn.ConnectionRecoveryErrorAsync += (c, args) => { // Uncomment for debugging // _output.WriteLine("[INFO] ConnectionRecoveryError: {0}", args.Exception); + return Task.CompletedTask; }; - aconn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); + aconn.ConnectionShutdownAsync += (c, args) => + { + Interlocked.Increment(ref counter); + return Task.CompletedTask; + }; Assert.True(_conn.IsOpen); @@ -322,7 +336,11 @@ public async Task TestShutdownEventHandlersRecoveryOnConnectionAfterTwoDelayedSe public async Task TestUnblockedListenersRecovery() { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionUnblocked += (source, ea) => tcs.SetResult(true); + _conn.ConnectionUnblockedAsync += (source, ea) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); await BlockAsync(_channel); diff --git a/projects/Test/SequentialIntegration/TestHeartbeats.cs b/projects/Test/SequentialIntegration/TestHeartbeats.cs index 38882e569..7699e5660 100644 --- a/projects/Test/SequentialIntegration/TestHeartbeats.cs +++ b/projects/Test/SequentialIntegration/TestHeartbeats.cs @@ -114,9 +114,10 @@ public async Task TestHundredsOfConnectionsWithRandomHeartbeatInterval() IConnection conn = await cf.CreateConnectionAsync($"{_testDisplayName}:{i}"); conns.Add(conn); IChannel ch = await conn.CreateChannelAsync(); - conn.ConnectionShutdown += (sender, evt) => + conn.ConnectionShutdownAsync += (sender, evt) => { CheckInitiator(evt); + return Task.CompletedTask; }; } @@ -139,7 +140,7 @@ private async Task RunSingleConnectionTestAsync(ConnectionFactory cf) { bool wasShutdown = false; - conn.ConnectionShutdown += (sender, evt) => + conn.ConnectionShutdownAsync += (sender, evt) => { lock (conn) { @@ -149,6 +150,7 @@ private async Task RunSingleConnectionTestAsync(ConnectionFactory cf) wasShutdown = true; } } + return Task.CompletedTask; }; await SleepFor(30);