Skip to content

Commit 7da7aa6

Browse files
authored
Merge pull request #835 from stebet/threadingChannels
Adding System.Threading.Channels for simplified, better performing code.
2 parents ab614d9 + 1c72f6e commit 7da7aa6

File tree

5 files changed

+47
-145
lines changed

5 files changed

+47
-145
lines changed

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
5858
<PackageReference Include="MinVer" Version="2.2.0" PrivateAssets="All" />
5959
<PackageReference Include="System.Memory" Version="4.5.4" />
60+
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
6061
</ItemGroup>
6162

6263
</Project>

projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,8 @@ public override async Task HandleBasicConsumeOk(string consumerTag)
4747
///<summary>Fires the Received event.</summary>
4848
public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
4949
{
50-
await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false);
51-
await (Received?.Invoke(
52-
this,
53-
new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)) ?? Task.CompletedTask).ConfigureAwait(false);
50+
await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
51+
await (Received?.Invoke(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)) ?? Task.CompletedTask).ConfigureAwait(false);
5452
}
5553

5654
///<summary>Fires the Shutdown event.</summary>

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
using System.Collections.Concurrent;
1+
using System;
2+
using System.Collections.Concurrent;
23
using System.Threading;
4+
using System.Threading.Channels;
35
using System.Threading.Tasks;
46

57
namespace RabbitMQ.Client.Impl
@@ -32,19 +34,14 @@ public Task Stop(IModel model)
3234

3335
class WorkPool
3436
{
35-
readonly ConcurrentQueue<Work> _workQueue;
36-
readonly CancellationTokenSource _tokenSource;
37+
readonly Channel<Work> _channel;
3738
readonly ModelBase _model;
38-
readonly CancellationTokenRegistration _tokenRegistration;
39-
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
4039
private Task _worker;
4140

4241
public WorkPool(ModelBase model)
4342
{
4443
_model = model;
45-
_workQueue = new ConcurrentQueue<Work>();
46-
_tokenSource = new CancellationTokenSource();
47-
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
44+
_channel = Channel.CreateUnbounded<Work>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
4845
}
4946

5047
public void Start()
@@ -54,35 +51,34 @@ public void Start()
5451

5552
public void Enqueue(Work work)
5653
{
57-
_workQueue.Enqueue(work);
58-
_syncSource.TrySetResult(true);
54+
_channel.Writer.TryWrite(work);
5955
}
6056

6157
async Task Loop()
6258
{
63-
while (_tokenSource.IsCancellationRequested == false)
59+
while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
6460
{
65-
try
61+
while (_channel.Reader.TryRead(out Work work))
6662
{
67-
await _syncSource.Task.ConfigureAwait(false);
68-
_syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
69-
}
70-
catch (TaskCanceledException)
71-
{
72-
// Swallowing the task cancellation in case we are stopping work.
73-
}
63+
try
64+
{
65+
Task task = work.Execute(_model);
66+
if (!task.IsCompleted)
67+
{
68+
await task.ConfigureAwait(false);
69+
}
70+
}
71+
catch(Exception)
72+
{
7473

75-
while (_workQueue.TryDequeue(out Work work))
76-
{
77-
await work.Execute(_model).ConfigureAwait(false);
74+
}
7875
}
7976
}
8077
}
8178

8279
public Task Stop()
8380
{
84-
_tokenSource.Cancel();
85-
_tokenRegistration.Dispose();
81+
_channel.Writer.Complete();
8682
return _worker;
8783
}
8884
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,12 @@
4242
using System.Collections.Concurrent;
4343
using System.Collections.Generic;
4444
using System.Linq;
45-
using System.Threading;
45+
using System.Threading.Channels;
4646
using System.Threading.Tasks;
4747

4848
using RabbitMQ.Client.Events;
4949
using RabbitMQ.Client.Exceptions;
5050
using RabbitMQ.Client.Impl;
51-
52-
using RabbitMQ.Util;
5351
using RabbitMQ.Client.Logging;
5452

5553
namespace RabbitMQ.Client.Framing.Impl
@@ -664,7 +662,7 @@ private void Init(IFrameHandler fh)
664662
{
665663
if (ShouldTriggerConnectionRecovery(args))
666664
{
667-
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.BeginAutomaticRecovery);
665+
_recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.BeginAutomaticRecovery);
668666
}
669667
};
670668
lock (_eventLock)
@@ -1243,9 +1241,7 @@ private enum RecoveryConnectionState
12431241
private Task _recoveryTask;
12441242
private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState.Connected;
12451243

1246-
private readonly AsyncConcurrentQueue<RecoveryCommand> _recoveryLoopCommandQueue = new AsyncConcurrentQueue<RecoveryCommand>();
1247-
private readonly CancellationTokenSource _recoveryCancellationToken = new CancellationTokenSource();
1248-
private readonly TaskCompletionSource<int> _recoveryLoopComplete = new TaskCompletionSource<int>();
1244+
private readonly Channel<RecoveryCommand> _recoveryLoopCommandQueue = Channel.CreateUnbounded<RecoveryCommand>(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false });
12491245

12501246
/// <summary>
12511247
/// This is the main loop for the auto-recovery thread.
@@ -1254,21 +1250,22 @@ private async Task MainRecoveryLoop()
12541250
{
12551251
try
12561252
{
1257-
while (!_recoveryCancellationToken.IsCancellationRequested)
1253+
while (await _recoveryLoopCommandQueue.Reader.WaitToReadAsync().ConfigureAwait(false))
12581254
{
1259-
var command = await _recoveryLoopCommandQueue.DequeueAsync(_recoveryCancellationToken.Token).ConfigureAwait(false);
1260-
1261-
switch (_recoveryLoopState)
1255+
while (_recoveryLoopCommandQueue.Reader.TryRead(out RecoveryCommand command))
12621256
{
1263-
case RecoveryConnectionState.Connected:
1264-
RecoveryLoopConnectedHandler(command);
1265-
break;
1266-
case RecoveryConnectionState.Recovering:
1267-
RecoveryLoopRecoveringHandler(command);
1268-
break;
1269-
default:
1270-
ESLog.Warn("RecoveryLoop state is out of range.");
1271-
break;
1257+
switch (_recoveryLoopState)
1258+
{
1259+
case RecoveryConnectionState.Connected:
1260+
RecoveryLoopConnectedHandler(command);
1261+
break;
1262+
case RecoveryConnectionState.Recovering:
1263+
RecoveryLoopRecoveringHandler(command);
1264+
break;
1265+
default:
1266+
ESLog.Warn("RecoveryLoop state is out of range.");
1267+
break;
1268+
}
12721269
}
12731270
}
12741271
}
@@ -1280,8 +1277,6 @@ private async Task MainRecoveryLoop()
12801277
{
12811278
ESLog.Error("Main recovery loop threw unexpected exception.", e);
12821279
}
1283-
1284-
_recoveryLoopComplete.SetResult(0);
12851280
}
12861281

12871282
/// <summary>
@@ -1290,8 +1285,10 @@ private async Task MainRecoveryLoop()
12901285
/// </summary>
12911286
private void StopRecoveryLoop()
12921287
{
1293-
_recoveryCancellationToken.Cancel();
1294-
if (!_recoveryLoopComplete.Task.Wait(_factory.RequestedConnectionTimeout))
1288+
_recoveryLoopCommandQueue.Writer.Complete();
1289+
Task timeout = Task.Delay(_factory.RequestedConnectionTimeout);
1290+
1291+
if (Task.WhenAny(_recoveryTask, timeout).Result == timeout)
12951292
{
12961293
ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop.");
12971294
}
@@ -1351,11 +1348,9 @@ private void RecoveryLoopConnectedHandler(RecoveryCommand command)
13511348
/// </summary>
13521349
private void ScheduleRecoveryRetry()
13531350
{
1354-
Task.Delay(_factory.NetworkRecoveryInterval)
1355-
.ContinueWith(t =>
1356-
{
1357-
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.PerformAutomaticRecovery);
1358-
});
1351+
_ = Task
1352+
.Delay(_factory.NetworkRecoveryInterval)
1353+
.ContinueWith(t => _recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.PerformAutomaticRecovery));
13591354
}
13601355
}
13611356
}

projects/RabbitMQ.Client/util/AsyncConcurrentQueue.cs

Lines changed: 0 additions & 88 deletions
This file was deleted.

0 commit comments

Comments
 (0)