Skip to content

Commit a4af618

Browse files
authored
Add "allowReconnect" to SignalR CloseMessages (#14908)
1 parent 5dfb923 commit a4af618

File tree

21 files changed

+471
-49
lines changed

21 files changed

+471
-49
lines changed

src/Components/Server/src/BlazorPack/BlazorPackHubProtocol.cs

+30-4
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public bool TryParseMessage(ref ReadOnlySequence<byte> input, IInvocationBinder
7878
message = PingMessage.Instance;
7979
return true;
8080
case HubProtocolConstants.CloseMessageType:
81-
message = CreateCloseMessage(ref reader);
81+
message = CreateCloseMessage(ref reader, itemCount);
8282
return true;
8383
default:
8484
// Future protocol changes can add message types, old clients can ignore them
@@ -196,10 +196,23 @@ private static CancelInvocationMessage CreateCancelInvocationMessage(ref Message
196196
return ApplyHeaders(headers, new CancelInvocationMessage(invocationId));
197197
}
198198

199-
private static CloseMessage CreateCloseMessage(ref MessagePackReader reader)
199+
private static CloseMessage CreateCloseMessage(ref MessagePackReader reader, int itemCount)
200200
{
201201
var error = ReadString(ref reader, "error");
202-
return new CloseMessage(error);
202+
var allowReconnect = false;
203+
204+
if (itemCount > 2)
205+
{
206+
allowReconnect = ReadBoolean(ref reader, "allowReconnect");
207+
}
208+
209+
// An empty string is still an error
210+
if (error == null && !allowReconnect)
211+
{
212+
return CloseMessage.Empty;
213+
}
214+
215+
return new CloseMessage(error, allowReconnect);
203216
}
204217

205218
private static Dictionary<string, string> ReadHeaders(ref MessagePackReader reader)
@@ -515,7 +528,7 @@ private void WriteCancelInvocationMessage(CancelInvocationMessage message, ref M
515528

516529
private void WriteCloseMessage(CloseMessage message, ref MessagePackWriter writer)
517530
{
518-
writer.WriteArrayHeader(2);
531+
writer.WriteArrayHeader(3);
519532
writer.Write(HubProtocolConstants.CloseMessageType);
520533
if (string.IsNullOrEmpty(message.Error))
521534
{
@@ -525,6 +538,8 @@ private void WriteCloseMessage(CloseMessage message, ref MessagePackWriter write
525538
{
526539
writer.Write(message.Error);
527540
}
541+
542+
writer.Write(message.AllowReconnect);
528543
}
529544

530545
private void WritePingMessage(PingMessage _, ref MessagePackWriter writer)
@@ -559,6 +574,17 @@ private static T ApplyHeaders<T>(IDictionary<string, string> source, T destinati
559574
return destination;
560575
}
561576

577+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
578+
private static bool ReadBoolean(ref MessagePackReader reader, string field)
579+
{
580+
if (reader.End || reader.NextMessagePackType != MessagePackType.Boolean)
581+
{
582+
ThrowInvalidDataException(field, "Boolean");
583+
}
584+
585+
return reader.ReadBoolean();
586+
}
587+
562588
[MethodImpl(MethodImplOptions.AggressiveInlining)]
563589
private static int ReadInt32(ref MessagePackReader reader, string field)
564590
{

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

+22-13
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,7 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
878878
}
879879
}
880880

881-
private async Task<(bool close, Exception exception)> ProcessMessagesAsync(HubMessage message, ConnectionState connectionState, ChannelWriter<InvocationMessage> invocationMessageWriter)
881+
private async Task<CloseMessage> ProcessMessagesAsync(HubMessage message, ConnectionState connectionState, ChannelWriter<InvocationMessage> invocationMessageWriter)
882882
{
883883
Log.ResettingKeepAliveTimer(_logger);
884884
connectionState.ResetTimeout();
@@ -911,21 +911,20 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
911911
if (!connectionState.TryGetInvocation(streamItem.InvocationId, out irq))
912912
{
913913
Log.DroppedStreamMessage(_logger, streamItem.InvocationId);
914-
return (close: false, exception: null);
914+
break;
915915
}
916916
await DispatchInvocationStreamItemAsync(streamItem, irq);
917917
break;
918918
case CloseMessage close:
919919
if (string.IsNullOrEmpty(close.Error))
920920
{
921921
Log.ReceivedClose(_logger);
922-
return (close: true, exception: null);
923922
}
924923
else
925924
{
926925
Log.ReceivedCloseWithError(_logger, close.Error);
927-
return (close: true, exception: new HubException($"The server closed the connection with the following error: {close.Error}"));
928926
}
927+
return close;
929928
case PingMessage _:
930929
Log.ReceivedPing(_logger);
931930
// timeout is reset above, on receiving any message
@@ -934,7 +933,7 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
934933
throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
935934
}
936935

937-
return (close: false, exception: null);
936+
return null;
938937
}
939938

940939
private async Task DispatchInvocationAsync(InvocationMessage invocation)
@@ -1150,25 +1149,33 @@ async Task StartProcessingInvocationMessages(ChannelReader<InvocationMessage> in
11501149
{
11511150
Log.ProcessingMessage(_logger, buffer.Length);
11521151

1153-
var close = false;
1152+
CloseMessage closeMessage = null;
11541153

11551154
while (_protocol.TryParseMessage(ref buffer, connectionState, out var message))
11561155
{
1157-
Exception exception;
1158-
11591156
// We have data, process it
1160-
(close, exception) = await ProcessMessagesAsync(message, connectionState, invocationMessageChannel.Writer);
1161-
if (close)
1157+
closeMessage = await ProcessMessagesAsync(message, connectionState, invocationMessageChannel.Writer);
1158+
1159+
if (closeMessage != null)
11621160
{
11631161
// Closing because we got a close frame, possibly with an error in it.
1164-
connectionState.CloseException = exception;
1165-
connectionState.Stopping = true;
1162+
if (closeMessage.Error != null)
1163+
{
1164+
connectionState.CloseException = new HubException($"The server closed the connection with the following error: {closeMessage.Error}");
1165+
}
1166+
1167+
// Stopping being true indicates the client shouldn't try to reconnect even if automatic reconnects are enabled.
1168+
if (!closeMessage.AllowReconnect)
1169+
{
1170+
connectionState.Stopping = true;
1171+
}
1172+
11661173
break;
11671174
}
11681175
}
11691176

11701177
// If we're closing stop everything
1171-
if (close)
1178+
if (closeMessage != null)
11721179
{
11731180
break;
11741181
}
@@ -1637,6 +1644,8 @@ private class ConnectionState : IInvocationBinder
16371644
public Exception CloseException { get; set; }
16381645
public CancellationToken UploadStreamToken { get; set; }
16391646

1647+
// Indicates the connection is stopping AND the client should NOT attempt to reconnect even if automatic reconnects are enabled.
1648+
// This means either HubConnection.DisposeAsync/StopAsync was called OR a CloseMessage with AllowReconnects set to false was received.
16401649
public bool Stopping
16411650
{
16421651
get => _stopping;

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Reconnect.cs

+150
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,156 @@ bool ExpectedErrors(WriteContext writeContext)
368368
}
369369
}
370370

371+
[Fact]
372+
public async Task CanBeInducedByCloseMessageWithAllowReconnectSet()
373+
{
374+
bool ExpectedErrors(WriteContext writeContext)
375+
{
376+
return writeContext.LoggerName == typeof(HubConnection).FullName &&
377+
(writeContext.EventId.Name == "ReceivedCloseWithError" ||
378+
writeContext.EventId.Name == "ReconnectingWithError");
379+
}
380+
381+
var failReconnectTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
382+
383+
using (StartVerifiableLog(ExpectedErrors))
384+
{
385+
var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
386+
var testConnectionFactory = default(ReconnectingConnectionFactory);
387+
388+
testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection());
389+
builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
390+
391+
var retryContexts = new List<RetryContext>();
392+
var mockReconnectPolicy = new Mock<IRetryPolicy>();
393+
mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
394+
{
395+
retryContexts.Add(context);
396+
return TimeSpan.Zero;
397+
});
398+
builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
399+
400+
await using var hubConnection = builder.Build();
401+
var reconnectingCount = 0;
402+
var reconnectedCount = 0;
403+
var reconnectingErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
404+
var reconnectedConnectionIdTcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
405+
var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
406+
407+
hubConnection.Reconnecting += error =>
408+
{
409+
reconnectingCount++;
410+
reconnectingErrorTcs.SetResult(error);
411+
return Task.CompletedTask;
412+
};
413+
414+
hubConnection.Reconnected += connectionId =>
415+
{
416+
reconnectedCount++;
417+
reconnectedConnectionIdTcs.SetResult(connectionId);
418+
return Task.CompletedTask;
419+
};
420+
421+
hubConnection.Closed += error =>
422+
{
423+
closedErrorTcs.SetResult(error);
424+
return Task.CompletedTask;
425+
};
426+
427+
await hubConnection.StartAsync().OrTimeout();
428+
429+
var currentConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
430+
await currentConnection.ReceiveJsonMessage(new
431+
{
432+
type = HubProtocolConstants.CloseMessageType,
433+
error = "Error!",
434+
allowReconnect = true,
435+
});
436+
437+
var reconnectingException = await reconnectingErrorTcs.Task.OrTimeout();
438+
var expectedMessage = "The server closed the connection with the following error: Error!";
439+
440+
Assert.Equal(expectedMessage, reconnectingException.Message);
441+
Assert.Single(retryContexts);
442+
Assert.Equal(expectedMessage, retryContexts[0].RetryReason.Message);
443+
Assert.Equal(0, retryContexts[0].PreviousRetryCount);
444+
Assert.Equal(TimeSpan.Zero, retryContexts[0].ElapsedTime);
445+
446+
await reconnectedConnectionIdTcs.Task.OrTimeout();
447+
448+
await hubConnection.StopAsync().OrTimeout();
449+
450+
var closeError = await closedErrorTcs.Task.OrTimeout();
451+
Assert.Null(closeError);
452+
Assert.Equal(1, reconnectingCount);
453+
Assert.Equal(1, reconnectedCount);
454+
}
455+
}
456+
457+
[Fact]
458+
public async Task CannotBeInducedByCloseMessageWithAllowReconnectOmitted()
459+
{
460+
bool ExpectedErrors(WriteContext writeContext)
461+
{
462+
return writeContext.LoggerName == typeof(HubConnection).FullName &&
463+
(writeContext.EventId.Name == "ReceivedCloseWithError" ||
464+
writeContext.EventId.Name == "ShutdownWithError");
465+
}
466+
467+
var failReconnectTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
468+
469+
using (StartVerifiableLog(ExpectedErrors))
470+
{
471+
var builder = new HubConnectionBuilder().WithLoggerFactory(LoggerFactory).WithUrl("http://example.com");
472+
var testConnectionFactory = default(ReconnectingConnectionFactory);
473+
474+
testConnectionFactory = new ReconnectingConnectionFactory(() => new TestConnection());
475+
builder.Services.AddSingleton<IConnectionFactory>(testConnectionFactory);
476+
477+
var reconnectingCount = 0;
478+
var nextRetryDelayCallCount = 0;
479+
var closedErrorTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
480+
481+
var mockReconnectPolicy = new Mock<IRetryPolicy>();
482+
mockReconnectPolicy.Setup(p => p.NextRetryDelay(It.IsAny<RetryContext>())).Returns<RetryContext>(context =>
483+
{
484+
nextRetryDelayCallCount++;
485+
return TimeSpan.Zero;
486+
});
487+
488+
builder.WithAutomaticReconnect(mockReconnectPolicy.Object);
489+
490+
await using var hubConnection = builder.Build();
491+
492+
hubConnection.Reconnecting += error =>
493+
{
494+
reconnectingCount++;
495+
return Task.CompletedTask;
496+
};
497+
498+
hubConnection.Closed += error =>
499+
{
500+
closedErrorTcs.SetResult(error);
501+
return Task.CompletedTask;
502+
};
503+
504+
await hubConnection.StartAsync().OrTimeout();
505+
506+
var currentConnection = await testConnectionFactory.GetNextOrCurrentTestConnection();
507+
await currentConnection.ReceiveJsonMessage(new
508+
{
509+
type = HubProtocolConstants.CloseMessageType,
510+
error = "Error!",
511+
});
512+
513+
var closeError = await closedErrorTcs.Task.OrTimeout();
514+
515+
Assert.Equal("The server closed the connection with the following error: Error!", closeError.Message);
516+
Assert.Equal(0, nextRetryDelayCallCount);
517+
Assert.Equal(0, reconnectingCount);
518+
}
519+
}
520+
371521
[Fact]
372522
public async Task EventsNotFiredIfFirstRetryDelayIsNull()
373523
{

src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts

+1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ export class MessagePackHubProtocol implements IHubProtocol {
124124

125125
return {
126126
// Close messages have no headers.
127+
allowReconnect: properties.length >= 3 ? properties[2] : undefined,
127128
error: properties[1],
128129
type: MessageType.Close,
129130
} as HubMessage;

src/SignalR/clients/ts/signalr/src/HubConnection.ts

+12-2
Original file line numberDiff line numberDiff line change
@@ -545,8 +545,18 @@ export class HubConnection {
545545
case MessageType.Close:
546546
this.logger.log(LogLevel.Information, "Close message received from server.");
547547

548-
// We don't want to wait on the stop itself.
549-
this.stopPromise = this.stopInternal(message.error ? new Error("Server returned an error on close: " + message.error) : undefined);
548+
const error = message.error ? new Error("Server returned an error on close: " + message.error) : undefined;
549+
550+
if (message.allowReconnect === true) {
551+
// It feels wrong not to await connection.stop() here, but processIncomingData is called as part of an onreceive callback which is not async,
552+
// this is already the behavior for serverTimeout(), and HttpConnection.Stop() should catch and log all possible exceptions.
553+
554+
// tslint:disable-next-line:no-floating-promises
555+
this.connection.stop(error);
556+
} else {
557+
// We cannot await stopInternal() here, but subsequent calls to stop() will await this if stopInternal() is still ongoing.
558+
this.stopPromise = this.stopInternal(error);
559+
}
550560

551561
break;
552562
default:

src/SignalR/clients/ts/signalr/src/IHubProtocol.ts

+3
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ export interface CloseMessage extends HubMessageBase {
131131
* If this property is undefined, the connection was closed normally and without error.
132132
*/
133133
readonly error?: string;
134+
135+
/** If true, clients with automatic reconnects enabled should attempt to reconnect after receiving the CloseMessage. Otherwise, they should not. */
136+
readonly allowReconnect?: boolean;
134137
}
135138

136139
/** A hub message sent to request that a streaming invocation be canceled. */

0 commit comments

Comments
 (0)