Skip to content

Adding System.Threading.Channels for simplified, better performing code. #835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="MinVer" Version="2.2.0" PrivateAssets="All" />
<PackageReference Include="System.Memory" Version="4.5.4" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@ public override async Task HandleBasicConsumeOk(string consumerTag)
///<summary>Fires the Received event.</summary>
public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false);
await (Received?.Invoke(
this,
new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)) ?? Task.CompletedTask).ConfigureAwait(false);
await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
await (Received?.Invoke(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)) ?? Task.CompletedTask).ConfigureAwait(false);
}

///<summary>Fires the Shutdown event.</summary>
Expand Down
44 changes: 20 additions & 24 deletions projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Collections.Concurrent;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
Expand Down Expand Up @@ -32,19 +34,14 @@ public Task Stop(IModel model)

class WorkPool
{
readonly ConcurrentQueue<Work> _workQueue;
readonly CancellationTokenSource _tokenSource;
readonly Channel<Work> _channel;
readonly ModelBase _model;
readonly CancellationTokenRegistration _tokenRegistration;
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
private Task _worker;

public WorkPool(ModelBase model)
{
_model = model;
_workQueue = new ConcurrentQueue<Work>();
_tokenSource = new CancellationTokenSource();
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
_channel = Channel.CreateUnbounded<Work>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
}

public void Start()
Expand All @@ -54,35 +51,34 @@ public void Start()

public void Enqueue(Work work)
{
_workQueue.Enqueue(work);
_syncSource.TrySetResult(true);
_channel.Writer.TryWrite(work);
}

async Task Loop()
{
while (_tokenSource.IsCancellationRequested == false)
while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
try
while (_channel.Reader.TryRead(out Work work))
{
await _syncSource.Task.ConfigureAwait(false);
_syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
catch (TaskCanceledException)
{
// Swallowing the task cancellation in case we are stopping work.
}
try
{
Task task = work.Execute(_model);
if (!task.IsCompleted)
{
await task.ConfigureAwait(false);
}
}
catch(Exception)
{

while (_workQueue.TryDequeue(out Work work))
{
await work.Execute(_model).ConfigureAwait(false);
}
}
}
}

public Task Stop()
{
_tokenSource.Cancel();
_tokenRegistration.Dispose();
_channel.Writer.Complete();
return _worker;
}
}
Expand Down
53 changes: 24 additions & 29 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;

using RabbitMQ.Util;
using RabbitMQ.Client.Logging;

namespace RabbitMQ.Client.Framing.Impl
Expand Down Expand Up @@ -664,7 +662,7 @@ private void Init(IFrameHandler fh)
{
if (ShouldTriggerConnectionRecovery(args))
{
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.BeginAutomaticRecovery);
_recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.BeginAutomaticRecovery);
}
};
lock (_eventLock)
Expand Down Expand Up @@ -1243,9 +1241,7 @@ private enum RecoveryConnectionState
private Task _recoveryTask;
private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState.Connected;

private readonly AsyncConcurrentQueue<RecoveryCommand> _recoveryLoopCommandQueue = new AsyncConcurrentQueue<RecoveryCommand>();
private readonly CancellationTokenSource _recoveryCancellationToken = new CancellationTokenSource();
private readonly TaskCompletionSource<int> _recoveryLoopComplete = new TaskCompletionSource<int>();
private readonly Channel<RecoveryCommand> _recoveryLoopCommandQueue = Channel.CreateUnbounded<RecoveryCommand>(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false });

/// <summary>
/// This is the main loop for the auto-recovery thread.
Expand All @@ -1254,21 +1250,22 @@ private async Task MainRecoveryLoop()
{
try
{
while (!_recoveryCancellationToken.IsCancellationRequested)
while (await _recoveryLoopCommandQueue.Reader.WaitToReadAsync().ConfigureAwait(false))
{
var command = await _recoveryLoopCommandQueue.DequeueAsync(_recoveryCancellationToken.Token).ConfigureAwait(false);

switch (_recoveryLoopState)
while (_recoveryLoopCommandQueue.Reader.TryRead(out RecoveryCommand command))
{
case RecoveryConnectionState.Connected:
RecoveryLoopConnectedHandler(command);
break;
case RecoveryConnectionState.Recovering:
RecoveryLoopRecoveringHandler(command);
break;
default:
ESLog.Warn("RecoveryLoop state is out of range.");
break;
switch (_recoveryLoopState)
{
case RecoveryConnectionState.Connected:
RecoveryLoopConnectedHandler(command);
break;
case RecoveryConnectionState.Recovering:
RecoveryLoopRecoveringHandler(command);
break;
default:
ESLog.Warn("RecoveryLoop state is out of range.");
break;
}
}
}
}
Expand All @@ -1280,8 +1277,6 @@ private async Task MainRecoveryLoop()
{
ESLog.Error("Main recovery loop threw unexpected exception.", e);
}

_recoveryLoopComplete.SetResult(0);
}

/// <summary>
Expand All @@ -1290,8 +1285,10 @@ private async Task MainRecoveryLoop()
/// </summary>
private void StopRecoveryLoop()
{
_recoveryCancellationToken.Cancel();
if (!_recoveryLoopComplete.Task.Wait(_factory.RequestedConnectionTimeout))
_recoveryLoopCommandQueue.Writer.Complete();
Task timeout = Task.Delay(_factory.RequestedConnectionTimeout);

if (Task.WhenAny(_recoveryTask, timeout).Result == timeout)
{
ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop.");
}
Expand Down Expand Up @@ -1351,11 +1348,9 @@ private void RecoveryLoopConnectedHandler(RecoveryCommand command)
/// </summary>
private void ScheduleRecoveryRetry()
{
Task.Delay(_factory.NetworkRecoveryInterval)
.ContinueWith(t =>
{
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.PerformAutomaticRecovery);
});
_ = Task
.Delay(_factory.NetworkRecoveryInterval)
.ContinueWith(t => _recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.PerformAutomaticRecovery));
}
}
}
88 changes: 0 additions & 88 deletions projects/RabbitMQ.Client/util/AsyncConcurrentQueue.cs

This file was deleted.