Skip to content

Make channel events async #1680

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -426,20 +426,20 @@ RabbitMQ.Client.IBasicProperties.UserId.get -> string
RabbitMQ.Client.IBasicProperties.UserId.set -> void
RabbitMQ.Client.IChannel
RabbitMQ.Client.IChannel.BasicAckAsync(ulong deliveryTag, bool multiple, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler<RabbitMQ.Client.Events.BasicAckEventArgs>
RabbitMQ.Client.IChannel.BasicAcksAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicAckEventArgs>
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
RabbitMQ.Client.IChannel.BasicNacksAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
RabbitMQ.Client.IChannel.BasicReturnAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
RabbitMQ.Client.IChannel.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
RabbitMQ.Client.IChannel.ChannelNumber.get -> int
RabbitMQ.Client.IChannel.ChannelShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
RabbitMQ.Client.IChannel.ChannelShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.ShutdownEventArgs>
RabbitMQ.Client.IChannel.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs
RabbitMQ.Client.IChannel.ContinuationTimeout.get -> System.TimeSpan
RabbitMQ.Client.IChannel.ContinuationTimeout.set -> void
RabbitMQ.Client.IChannel.CurrentQueue.get -> string
RabbitMQ.Client.IChannel.DefaultConsumer.get -> RabbitMQ.Client.IAsyncBasicConsumer
RabbitMQ.Client.IChannel.DefaultConsumer.set -> void
RabbitMQ.Client.IChannel.FlowControl -> System.EventHandler<RabbitMQ.Client.Events.FlowControlEventArgs>
RabbitMQ.Client.IChannel.FlowControlAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.FlowControlEventArgs>
RabbitMQ.Client.IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<ulong>
RabbitMQ.Client.IChannel.IsClosed.get -> bool
RabbitMQ.Client.IChannel.IsOpen.get -> bool
Expand Down Expand Up @@ -550,7 +550,7 @@ RabbitMQ.Client.IRecordedQueue.Exclusive.get -> bool
RabbitMQ.Client.IRecordedQueue.IsServerNamed.get -> bool
RabbitMQ.Client.IRecordedQueue.Name.get -> string
RabbitMQ.Client.IRecoverable
RabbitMQ.Client.IRecoverable.Recovery -> System.EventHandler<System.EventArgs>
RabbitMQ.Client.IRecoverable.RecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs>
RabbitMQ.Client.ITcpClient
RabbitMQ.Client.ITcpClient.Client.get -> System.Net.Sockets.Socket
RabbitMQ.Client.ITcpClient.Close() -> void
Expand Down
16 changes: 8 additions & 8 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,28 +106,28 @@ public interface IChannel : IDisposable
/// <summary>
/// Signalled when a Basic.Ack command arrives from the broker.
/// </summary>
event EventHandler<BasicAckEventArgs> BasicAcks;
event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync;

/// <summary>
/// Signalled when a Basic.Nack command arrives from the broker.
/// </summary>
event EventHandler<BasicNackEventArgs> BasicNacks;
event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync;

/// <summary>
/// Signalled when a Basic.Return command arrives from the broker.
/// </summary>
event EventHandler<BasicReturnEventArgs> BasicReturn;
event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync;

/// <summary>
/// Signalled when an exception occurs in a callback invoked by the channel.
///
/// Examples of cases where this event will be signalled
/// include exceptions thrown in <see cref="IAsyncBasicConsumer"/> methods, or
/// exceptions thrown in <see cref="ChannelShutdown"/> delegates etc.
/// exceptions thrown in <see cref="ChannelShutdownAsync"/> delegates etc.
/// </summary>
event EventHandler<CallbackExceptionEventArgs> CallbackException;
event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync;

event EventHandler<FlowControlEventArgs> FlowControl;
event AsyncEventHandler<FlowControlEventArgs> FlowControlAsync;

/// <summary>
/// Notifies the destruction of the channel.
Expand All @@ -136,7 +136,7 @@ public interface IChannel : IDisposable
/// If the channel is already destroyed at the time an event
/// handler is added to this event, the event handler will be fired immediately.
/// </remarks>
event EventHandler<ShutdownEventArgs> ChannelShutdown;
event AsyncEventHandler<ShutdownEventArgs> ChannelShutdownAsync;

/// <summary>
/// When in confirm mode, return the sequence number of the next message to be published.
Expand Down Expand Up @@ -268,7 +268,7 @@ Task CloseAsync(ShutdownEventArgs reason, bool abort,
/// <summary>
/// Asynchronously enable publisher confirmations.
/// </summary>
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcks"/> and <see cref="BasicNacks"/> yourself.</param>
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcksAsync"/> and <see cref="BasicNacksAsync"/> yourself.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
Task ConfirmSelectAsync(bool trackConfirmations = true,
CancellationToken cancellationToken = default);
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/api/IRecoverable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//---------------------------------------------------------------------------

using System;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
{
Expand All @@ -38,6 +39,6 @@ namespace RabbitMQ.Client
/// </summary>
public interface IRecoverable
{
event EventHandler<EventArgs> Recovery;
event AsyncEventHandler<EventArgs> RecoveryAsync;
}
}
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.BasicReturn:
{
HandleBasicReturn(cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleBasicReturn(cmd);
}
case ProtocolCommandId.ChannelClose:
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,10 @@ public override void HandleChannelShutdown(ShutdownEventArgs reason)
// Nothing to do here!
}

public void OnConnectionShutdown(object? sender, ShutdownEventArgs reason)
public Task OnConnectionShutdownAsync(object? sender, ShutdownEventArgs reason)
{
_tcs.TrySetResult(true);
return Task.CompletedTask;
}
}

Expand Down
45 changes: 23 additions & 22 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,46 +79,46 @@ public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel
_consumerDispatchConcurrency = consumerDispatchConcurrency;
}

public event EventHandler<BasicAckEventArgs> BasicAcks
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
{
add => InnerChannel.BasicAcks += value;
remove => InnerChannel.BasicAcks -= value;
add => InnerChannel.BasicAcksAsync += value;
remove => InnerChannel.BasicAcksAsync -= value;
}

public event EventHandler<BasicNackEventArgs> BasicNacks
public event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync
{
add => InnerChannel.BasicNacks += value;
remove => InnerChannel.BasicNacks -= value;
add => InnerChannel.BasicNacksAsync += value;
remove => InnerChannel.BasicNacksAsync -= value;
}

public event EventHandler<BasicReturnEventArgs> BasicReturn
public event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync
{
add => InnerChannel.BasicReturn += value;
remove => InnerChannel.BasicReturn -= value;
add => InnerChannel.BasicReturnAsync += value;
remove => InnerChannel.BasicReturnAsync -= value;
}

public event EventHandler<CallbackExceptionEventArgs> CallbackException
public event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync
{
add => InnerChannel.CallbackException += value;
remove => InnerChannel.CallbackException -= value;
add => InnerChannel.CallbackExceptionAsync += value;
remove => InnerChannel.CallbackExceptionAsync -= value;
}

public event EventHandler<FlowControlEventArgs> FlowControl
public event AsyncEventHandler<FlowControlEventArgs> FlowControlAsync
{
add { InnerChannel.FlowControl += value; }
remove { InnerChannel.FlowControl -= value; }
add { InnerChannel.FlowControlAsync += value; }
remove { InnerChannel.FlowControlAsync -= value; }
}

public event EventHandler<ShutdownEventArgs> ChannelShutdown
public event AsyncEventHandler<ShutdownEventArgs> ChannelShutdownAsync
{
add => InnerChannel.ChannelShutdown += value;
remove => InnerChannel.ChannelShutdown -= value;
add => InnerChannel.ChannelShutdownAsync += value;
remove => InnerChannel.ChannelShutdownAsync -= value;
}

public event EventHandler<EventArgs> Recovery
public event AsyncEventHandler<EventArgs> RecoveryAsync
{
add { InnerChannel.Recovery += value; }
remove { InnerChannel.Recovery -= value; }
add { InnerChannel.RecoveryAsync += value; }
remove { InnerChannel.RecoveryAsync -= value; }
}

public IEnumerable<string> ConsumerTags
Expand Down Expand Up @@ -213,7 +213,8 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph
.ConfigureAwait(false);
}

_innerChannel.RunRecoveryEventHandlers(this);
await _innerChannel.RunRecoveryEventHandlers(this)
.ConfigureAwait(false);

return true;
}
Expand Down
Loading