Skip to content

Commit f2ba889

Browse files
committed
Making sure AsyncEventHandlers are all run. Fixes issue rabbitmq#838.
1 parent ab614d9 commit f2ba889

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed

projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,7 @@ public virtual Task HandleModelShutdown(object model, ShutdownEventArgs reason)
140140
public virtual async Task OnCancel(params string[] consumerTags)
141141
{
142142
IsRunning = false;
143-
foreach (AsyncEventHandler<ConsumerEventArgs> h in ConsumerCancelled?.GetInvocationList() ?? Array.Empty<Delegate>())
144-
{
145-
await h(this, new ConsumerEventArgs(consumerTags)).ConfigureAwait(false);
146-
}
147-
143+
await ConsumerCancelled.InvokeAsync(this, new ConsumerEventArgs(consumerTags)).ConfigureAwait(false);
148144
foreach (string consumerTag in consumerTags)
149145
{
150146
_consumerTags.Remove(consumerTag);

projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,18 @@
44
namespace RabbitMQ.Client.Events
55
{
66
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event) where TEvent : EventArgs;
7-
}
7+
8+
internal static class AsyncEventHandlerExtensions
9+
{
10+
public static async Task InvokeAsync<TEvent>(this AsyncEventHandler<TEvent> eventHandler, object sender, TEvent @event) where TEvent : EventArgs
11+
{
12+
if(eventHandler != null)
13+
{
14+
foreach(AsyncEventHandler<TEvent> handlerInstance in eventHandler.GetInvocationList())
15+
{
16+
await handlerInstance(sender, @event).ConfigureAwait(false);
17+
}
18+
}
19+
}
20+
}
21+
}

projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,30 +34,28 @@ public AsyncEventingBasicConsumer(IModel model) : base(model)
3434
public override async Task HandleBasicCancelOk(string consumerTag)
3535
{
3636
await base.HandleBasicCancelOk(consumerTag).ConfigureAwait(false);
37-
await (Unregistered?.Invoke(this, new ConsumerEventArgs(new[] { consumerTag })) ?? Task.CompletedTask).ConfigureAwait(false);
37+
await Unregistered.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag })).ConfigureAwait(false);
3838
}
3939

4040
///<summary>Fires when the server confirms successful consumer registration.</summary>
4141
public override async Task HandleBasicConsumeOk(string consumerTag)
4242
{
4343
await base.HandleBasicConsumeOk(consumerTag).ConfigureAwait(false);
44-
await (Registered?.Invoke(this, new ConsumerEventArgs(new[] { consumerTag })) ?? Task.CompletedTask).ConfigureAwait(false);
44+
await Registered.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag })).ConfigureAwait(false);
4545
}
4646

4747
///<summary>Fires the Received event.</summary>
4848
public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
4949
{
5050
await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false);
51-
await (Received?.Invoke(
52-
this,
53-
new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)) ?? Task.CompletedTask).ConfigureAwait(false);
51+
await Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)).ConfigureAwait(false);
5452
}
5553

5654
///<summary>Fires the Shutdown event.</summary>
5755
public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason)
5856
{
5957
await base.HandleModelShutdown(model, reason).ConfigureAwait(false);
60-
await (Shutdown?.Invoke(this, reason) ?? Task.CompletedTask).ConfigureAwait(false);
58+
await Shutdown.InvokeAsync(this, reason).ConfigureAwait(false);
6159
}
6260
}
6361
}

0 commit comments

Comments
 (0)