Skip to content

Commit fd36d23

Browse files
authored
Merge pull request #1680 from danielmarbach/async-channel-events
Make channel events async
2 parents 6f15686 + 48e6307 commit fd36d23

32 files changed

+237
-224
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -426,20 +426,20 @@ RabbitMQ.Client.IBasicProperties.UserId.get -> string
426426
RabbitMQ.Client.IBasicProperties.UserId.set -> void
427427
RabbitMQ.Client.IChannel
428428
RabbitMQ.Client.IChannel.BasicAckAsync(ulong deliveryTag, bool multiple, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
429-
RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler<RabbitMQ.Client.Events.BasicAckEventArgs>
429+
RabbitMQ.Client.IChannel.BasicAcksAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicAckEventArgs>
430430
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
431-
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
432-
RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
433-
RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
431+
RabbitMQ.Client.IChannel.BasicNacksAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
432+
RabbitMQ.Client.IChannel.BasicReturnAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
433+
RabbitMQ.Client.IChannel.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
434434
RabbitMQ.Client.IChannel.ChannelNumber.get -> int
435-
RabbitMQ.Client.IChannel.ChannelShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
435+
RabbitMQ.Client.IChannel.ChannelShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.ShutdownEventArgs>
436436
RabbitMQ.Client.IChannel.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs
437437
RabbitMQ.Client.IChannel.ContinuationTimeout.get -> System.TimeSpan
438438
RabbitMQ.Client.IChannel.ContinuationTimeout.set -> void
439439
RabbitMQ.Client.IChannel.CurrentQueue.get -> string
440440
RabbitMQ.Client.IChannel.DefaultConsumer.get -> RabbitMQ.Client.IAsyncBasicConsumer
441441
RabbitMQ.Client.IChannel.DefaultConsumer.set -> void
442-
RabbitMQ.Client.IChannel.FlowControl -> System.EventHandler<RabbitMQ.Client.Events.FlowControlEventArgs>
442+
RabbitMQ.Client.IChannel.FlowControlAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.FlowControlEventArgs>
443443
RabbitMQ.Client.IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<ulong>
444444
RabbitMQ.Client.IChannel.IsClosed.get -> bool
445445
RabbitMQ.Client.IChannel.IsOpen.get -> bool
@@ -550,7 +550,7 @@ RabbitMQ.Client.IRecordedQueue.Exclusive.get -> bool
550550
RabbitMQ.Client.IRecordedQueue.IsServerNamed.get -> bool
551551
RabbitMQ.Client.IRecordedQueue.Name.get -> string
552552
RabbitMQ.Client.IRecoverable
553-
RabbitMQ.Client.IRecoverable.Recovery -> System.EventHandler<System.EventArgs>
553+
RabbitMQ.Client.IRecoverable.RecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs>
554554
RabbitMQ.Client.ITcpClient
555555
RabbitMQ.Client.ITcpClient.Client.get -> System.Net.Sockets.Socket
556556
RabbitMQ.Client.ITcpClient.Close() -> void

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,28 +106,28 @@ public interface IChannel : IDisposable
106106
/// <summary>
107107
/// Signalled when a Basic.Ack command arrives from the broker.
108108
/// </summary>
109-
event EventHandler<BasicAckEventArgs> BasicAcks;
109+
event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync;
110110

111111
/// <summary>
112112
/// Signalled when a Basic.Nack command arrives from the broker.
113113
/// </summary>
114-
event EventHandler<BasicNackEventArgs> BasicNacks;
114+
event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync;
115115

116116
/// <summary>
117117
/// Signalled when a Basic.Return command arrives from the broker.
118118
/// </summary>
119-
event EventHandler<BasicReturnEventArgs> BasicReturn;
119+
event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync;
120120

121121
/// <summary>
122122
/// Signalled when an exception occurs in a callback invoked by the channel.
123123
///
124124
/// Examples of cases where this event will be signalled
125125
/// include exceptions thrown in <see cref="IAsyncBasicConsumer"/> methods, or
126-
/// exceptions thrown in <see cref="ChannelShutdown"/> delegates etc.
126+
/// exceptions thrown in <see cref="ChannelShutdownAsync"/> delegates etc.
127127
/// </summary>
128-
event EventHandler<CallbackExceptionEventArgs> CallbackException;
128+
event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync;
129129

130-
event EventHandler<FlowControlEventArgs> FlowControl;
130+
event AsyncEventHandler<FlowControlEventArgs> FlowControlAsync;
131131

132132
/// <summary>
133133
/// Notifies the destruction of the channel.
@@ -136,7 +136,7 @@ public interface IChannel : IDisposable
136136
/// If the channel is already destroyed at the time an event
137137
/// handler is added to this event, the event handler will be fired immediately.
138138
/// </remarks>
139-
event EventHandler<ShutdownEventArgs> ChannelShutdown;
139+
event AsyncEventHandler<ShutdownEventArgs> ChannelShutdownAsync;
140140

141141
/// <summary>
142142
/// When in confirm mode, return the sequence number of the next message to be published.
@@ -268,7 +268,7 @@ Task CloseAsync(ShutdownEventArgs reason, bool abort,
268268
/// <summary>
269269
/// Asynchronously enable publisher confirmations.
270270
/// </summary>
271-
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcks"/> and <see cref="BasicNacks"/> yourself.</param>
271+
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcksAsync"/> and <see cref="BasicNacksAsync"/> yourself.</param>
272272
/// <param name="cancellationToken">CancellationToken for this operation.</param>
273273
Task ConfirmSelectAsync(bool trackConfirmations = true,
274274
CancellationToken cancellationToken = default);

projects/RabbitMQ.Client/client/api/IRecoverable.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using RabbitMQ.Client.Events;
3334

3435
namespace RabbitMQ.Client
3536
{
@@ -38,6 +39,6 @@ namespace RabbitMQ.Client
3839
/// </summary>
3940
public interface IRecoverable
4041
{
41-
event EventHandler<EventArgs> Recovery;
42+
event AsyncEventHandler<EventArgs> RecoveryAsync;
4243
}
4344
}

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
9797
}
9898
case ProtocolCommandId.BasicReturn:
9999
{
100-
HandleBasicReturn(cmd);
101-
return Task.FromResult(true);
100+
// Note: always returns true
101+
return HandleBasicReturn(cmd);
102102
}
103103
case ProtocolCommandId.ChannelClose:
104104
{

projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,10 @@ public override void HandleChannelShutdown(ShutdownEventArgs reason)
323323
// Nothing to do here!
324324
}
325325

326-
public void OnConnectionShutdown(object? sender, ShutdownEventArgs reason)
326+
public Task OnConnectionShutdownAsync(object? sender, ShutdownEventArgs reason)
327327
{
328328
_tcs.TrySetResult(true);
329+
return Task.CompletedTask;
329330
}
330331
}
331332

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -79,46 +79,46 @@ public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel
7979
_consumerDispatchConcurrency = consumerDispatchConcurrency;
8080
}
8181

82-
public event EventHandler<BasicAckEventArgs> BasicAcks
82+
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
8383
{
84-
add => InnerChannel.BasicAcks += value;
85-
remove => InnerChannel.BasicAcks -= value;
84+
add => InnerChannel.BasicAcksAsync += value;
85+
remove => InnerChannel.BasicAcksAsync -= value;
8686
}
8787

88-
public event EventHandler<BasicNackEventArgs> BasicNacks
88+
public event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync
8989
{
90-
add => InnerChannel.BasicNacks += value;
91-
remove => InnerChannel.BasicNacks -= value;
90+
add => InnerChannel.BasicNacksAsync += value;
91+
remove => InnerChannel.BasicNacksAsync -= value;
9292
}
9393

94-
public event EventHandler<BasicReturnEventArgs> BasicReturn
94+
public event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync
9595
{
96-
add => InnerChannel.BasicReturn += value;
97-
remove => InnerChannel.BasicReturn -= value;
96+
add => InnerChannel.BasicReturnAsync += value;
97+
remove => InnerChannel.BasicReturnAsync -= value;
9898
}
9999

100-
public event EventHandler<CallbackExceptionEventArgs> CallbackException
100+
public event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync
101101
{
102-
add => InnerChannel.CallbackException += value;
103-
remove => InnerChannel.CallbackException -= value;
102+
add => InnerChannel.CallbackExceptionAsync += value;
103+
remove => InnerChannel.CallbackExceptionAsync -= value;
104104
}
105105

106-
public event EventHandler<FlowControlEventArgs> FlowControl
106+
public event AsyncEventHandler<FlowControlEventArgs> FlowControlAsync
107107
{
108-
add { InnerChannel.FlowControl += value; }
109-
remove { InnerChannel.FlowControl -= value; }
108+
add { InnerChannel.FlowControlAsync += value; }
109+
remove { InnerChannel.FlowControlAsync -= value; }
110110
}
111111

112-
public event EventHandler<ShutdownEventArgs> ChannelShutdown
112+
public event AsyncEventHandler<ShutdownEventArgs> ChannelShutdownAsync
113113
{
114-
add => InnerChannel.ChannelShutdown += value;
115-
remove => InnerChannel.ChannelShutdown -= value;
114+
add => InnerChannel.ChannelShutdownAsync += value;
115+
remove => InnerChannel.ChannelShutdownAsync -= value;
116116
}
117117

118-
public event EventHandler<EventArgs> Recovery
118+
public event AsyncEventHandler<EventArgs> RecoveryAsync
119119
{
120-
add { InnerChannel.Recovery += value; }
121-
remove { InnerChannel.Recovery -= value; }
120+
add { InnerChannel.RecoveryAsync += value; }
121+
remove { InnerChannel.RecoveryAsync -= value; }
122122
}
123123

124124
public IEnumerable<string> ConsumerTags
@@ -213,7 +213,8 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph
213213
.ConfigureAwait(false);
214214
}
215215

216-
_innerChannel.RunRecoveryEventHandlers(this);
216+
await _innerChannel.RunRecoveryEventHandlers(this)
217+
.ConfigureAwait(false);
217218

218219
return true;
219220
}

0 commit comments

Comments
 (0)