Skip to content

Commit 96f55fc

Browse files
authored
Wrap the PipeWriter used by Kestrel's output writing logic (#12081)
1 parent fb12909 commit 96f55fc

18 files changed

+1077
-248
lines changed

src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs

+7-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Microsoft.AspNetCore.Internal;
1313
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
1414
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
15+
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
1516

1617
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
1718
{
@@ -45,7 +46,7 @@ internal class Http1OutputProducer : IHttpOutputProducer, IHttpOutputAborter, ID
4546
private long _unflushedBytes;
4647
private int _currentMemoryPrefixBytes;
4748

48-
private readonly PipeWriter _pipeWriter;
49+
private readonly ConcurrentPipeWriter _pipeWriter;
4950
private IMemoryOwner<byte> _fakeMemoryOwner;
5051

5152
// Chunked responses need to be treated uniquely when using GetMemory + Advance.
@@ -81,18 +82,16 @@ public Http1OutputProducer(
8182
IHttpMinResponseDataRateFeature minResponseDataRateFeature,
8283
MemoryPool<byte> memoryPool)
8384
{
84-
_pipeWriter = pipeWriter;
85+
// Allow appending more data to the PipeWriter when a flush is pending.
86+
_pipeWriter = new ConcurrentPipeWriter(pipeWriter, memoryPool);
8587
_connectionId = connectionId;
8688
_connectionContext = connectionContext;
8789
_log = log;
8890
_minResponseDataRateFeature = minResponseDataRateFeature;
89-
_flusher = new TimingPipeFlusher(pipeWriter, timeoutControl, log);
91+
_flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl, log);
9092
_memoryPool = memoryPool;
9193
}
9294

93-
// For tests
94-
internal PipeWriter PipeWriter => _pipeWriter;
95-
9695
public Task WriteDataAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default)
9796
{
9897
if (cancellationToken.IsCancellationRequested)
@@ -408,6 +407,8 @@ public void Dispose()
408407
{
409408
lock (_contextLock)
410409
{
410+
_pipeWriter.Abort();
411+
411412
if (_fakeMemoryOwner != null)
412413
{
413414
_fakeMemoryOwner.Dispose();

src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public Http2Connection(HttpConnectionContext context)
8787
context.TimeoutControl,
8888
httpLimits.MinResponseDataRate,
8989
context.ConnectionId,
90+
context.MemoryPool,
9091
context.ServiceContext.Log);
9192

9293
_hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize);

src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs

+6-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
1616
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.HPack;
1717
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
18+
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
1819

1920
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
2021
{
@@ -26,7 +27,7 @@ internal class Http2FrameWriter
2627
private readonly object _writeLock = new object();
2728
private readonly Http2Frame _outgoingFrame;
2829
private readonly HPackEncoder _hpackEncoder = new HPackEncoder();
29-
private readonly PipeWriter _outputWriter;
30+
private readonly ConcurrentPipeWriter _outputWriter;
3031
private readonly ConnectionContext _connectionContext;
3132
private readonly Http2Connection _http2Connection;
3233
private readonly OutputFlowControl _connectionOutputFlowControl;
@@ -51,9 +52,11 @@ public Http2FrameWriter(
5152
ITimeoutControl timeoutControl,
5253
MinDataRate minResponseDataRate,
5354
string connectionId,
55+
MemoryPool<byte> memoryPool,
5456
IKestrelTrace log)
5557
{
56-
_outputWriter = outputPipeWriter;
58+
// Allow appending more data to the PipeWriter when a flush is pending.
59+
_outputWriter = new ConcurrentPipeWriter(outputPipeWriter, memoryPool);
5760
_connectionContext = connectionContext;
5861
_http2Connection = http2Connection;
5962
_connectionOutputFlowControl = connectionOutputFlowControl;
@@ -89,6 +92,7 @@ public void Complete()
8992

9093
_completed = true;
9194
_connectionOutputFlowControl.Abort();
95+
_outputWriter.Abort();
9296
}
9397
}
9498

src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs

+22-15
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
1313
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
1414
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
15+
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
1516
using Microsoft.Extensions.Logging;
1617

1718
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
@@ -29,7 +30,8 @@ internal class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter
2930
private readonly MemoryPool<byte> _memoryPool;
3031
private readonly Http2Stream _stream;
3132
private readonly object _dataWriterLock = new object();
32-
private readonly Pipe _dataPipe;
33+
private readonly PipeWriter _pipeWriter;
34+
private readonly PipeReader _pipeReader;
3335
private readonly ValueTask<FlushResult> _dataWriteProcessingTask;
3436
private bool _startedWritingDataFrames;
3537
private bool _completed;
@@ -43,7 +45,6 @@ public Http2OutputProducer(
4345
int streamId,
4446
Http2FrameWriter frameWriter,
4547
StreamOutputFlowControl flowControl,
46-
ITimeoutControl timeoutControl,
4748
MemoryPool<byte> pool,
4849
Http2Stream stream,
4950
IKestrelTrace log)
@@ -55,8 +56,14 @@ public Http2OutputProducer(
5556
_stream = stream;
5657
_log = log;
5758

58-
_dataPipe = CreateDataPipe(pool);
59-
_flusher = new TimingPipeFlusher(_dataPipe.Writer, timeoutControl, log);
59+
var pipe = CreateDataPipe(pool);
60+
61+
_pipeWriter = new ConcurrentPipeWriter(pipe.Writer, pool);
62+
_pipeReader = pipe.Reader;
63+
64+
// No need to pass in timeoutControl here, since no minDataRates are passed to the TimingPipeFlusher.
65+
// The minimum output data rate is enforced at the connection level by Http2FrameWriter.
66+
_flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl: null, log);
6067
_dataWriteProcessingTask = ProcessDataWrites();
6168
}
6269

@@ -193,7 +200,7 @@ public Task WriteDataAsync(ReadOnlySpan<byte> data, CancellationToken cancellati
193200

194201
_startedWritingDataFrames = true;
195202

196-
_dataPipe.Writer.Write(data);
203+
_pipeWriter.Write(data);
197204
return _flusher.FlushAsync(this, cancellationToken).GetAsTask();
198205
}
199206
}
@@ -210,7 +217,7 @@ public ValueTask<FlushResult> WriteStreamSuffixAsync()
210217
_completed = true;
211218
_suffixSent = true;
212219

213-
_dataPipe.Writer.Complete();
220+
_pipeWriter.Complete();
214221
return _dataWriteProcessingTask;
215222
}
216223
}
@@ -239,7 +246,7 @@ public void Advance(int bytes)
239246

240247
_startedWritingDataFrames = true;
241248

242-
_dataPipe.Writer.Advance(bytes);
249+
_pipeWriter.Advance(bytes);
243250
}
244251
}
245252

@@ -254,7 +261,7 @@ public Span<byte> GetSpan(int sizeHint = 0)
254261
return GetFakeMemory(sizeHint).Span;
255262
}
256263

257-
return _dataPipe.Writer.GetSpan(sizeHint);
264+
return _pipeWriter.GetSpan(sizeHint);
258265
}
259266
}
260267

@@ -269,7 +276,7 @@ public Memory<byte> GetMemory(int sizeHint = 0)
269276
return GetFakeMemory(sizeHint);
270277
}
271278

272-
return _dataPipe.Writer.GetMemory(sizeHint);
279+
return _pipeWriter.GetMemory(sizeHint);
273280
}
274281
}
275282

@@ -282,7 +289,7 @@ public void CancelPendingFlush()
282289
return;
283290
}
284291

285-
_dataPipe.Writer.CancelPendingFlush();
292+
_pipeWriter.CancelPendingFlush();
286293
}
287294
}
288295

@@ -306,7 +313,7 @@ public ValueTask<FlushResult> WriteDataToPipeAsync(ReadOnlySpan<byte> data, Canc
306313

307314
_startedWritingDataFrames = true;
308315

309-
_dataPipe.Writer.Write(data);
316+
_pipeWriter.Write(data);
310317
return _flusher.FlushAsync(this, cancellationToken);
311318
}
312319
}
@@ -345,7 +352,7 @@ public void Stop()
345352
// Complete with an exception to prevent an end of stream data frame from being sent without an
346353
// explicit call to WriteStreamSuffixAsync. ConnectionAbortedExceptions are swallowed, so the
347354
// message doesn't matter
348-
_dataPipe.Writer.Complete(new OperationCanceledException());
355+
_pipeWriter.Complete(new OperationCanceledException());
349356

350357
_frameWriter.AbortPendingStreamDataWrites(_flowControl);
351358
}
@@ -364,7 +371,7 @@ private async ValueTask<FlushResult> ProcessDataWrites()
364371

365372
do
366373
{
367-
readResult = await _dataPipe.Reader.ReadAsync();
374+
readResult = await _pipeReader.ReadAsync();
368375

369376
if (readResult.IsCompleted && _stream.ResponseTrailers?.Count > 0)
370377
{
@@ -393,7 +400,7 @@ private async ValueTask<FlushResult> ProcessDataWrites()
393400
flushResult = await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: readResult.IsCompleted);
394401
}
395402

396-
_dataPipe.Reader.AdvanceTo(readResult.Buffer.End);
403+
_pipeReader.AdvanceTo(readResult.Buffer.End);
397404
} while (!readResult.IsCompleted);
398405
}
399406
catch (OperationCanceledException)
@@ -405,7 +412,7 @@ private async ValueTask<FlushResult> ProcessDataWrites()
405412
_log.LogCritical(ex, nameof(Http2OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected exception.");
406413
}
407414

408-
_dataPipe.Reader.Complete();
415+
_pipeReader.Complete();
409416

410417
return flushResult;
411418

src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs

-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public Http2Stream(Http2StreamContext context)
5151
context.StreamId,
5252
context.FrameWriter,
5353
_outputFlowControl,
54-
context.TimeoutControl,
5554
context.MemoryPool,
5655
this,
5756
context.ServiceContext.Log);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Buffers;
5+
using System.Diagnostics;
6+
using System.Runtime.CompilerServices;
7+
8+
namespace System.IO.Pipelines
9+
{
10+
// Copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs
11+
internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
12+
{
13+
private object _memoryOwner;
14+
private BufferSegment _next;
15+
private int _end;
16+
17+
/// <summary>
18+
/// The End represents the offset into AvailableMemory where the range of "active" bytes ends. At the point when the block is leased
19+
/// the End is guaranteed to be equal to Start. The value of Start may be assigned anywhere between 0 and
20+
/// Buffer.Length, and must be equal to or less than End.
21+
/// </summary>
22+
public int End
23+
{
24+
get => _end;
25+
set
26+
{
27+
Debug.Assert(value <= AvailableMemory.Length);
28+
29+
_end = value;
30+
Memory = AvailableMemory.Slice(0, value);
31+
}
32+
}
33+
34+
/// <summary>
35+
/// Reference to the next block of data when the overall "active" bytes spans multiple blocks. At the point when the block is
36+
/// leased Next is guaranteed to be null. Start, End, and Next are used together in order to create a linked-list of discontiguous
37+
/// working memory. The "active" memory is grown when bytes are copied in, End is increased, and Next is assigned. The "active"
38+
/// memory is shrunk when bytes are consumed, Start is increased, and blocks are returned to the pool.
39+
/// </summary>
40+
public BufferSegment NextSegment
41+
{
42+
get => _next;
43+
set
44+
{
45+
Next = value;
46+
_next = value;
47+
}
48+
}
49+
50+
public void SetOwnedMemory(IMemoryOwner<byte> memoryOwner)
51+
{
52+
_memoryOwner = memoryOwner;
53+
AvailableMemory = memoryOwner.Memory;
54+
}
55+
56+
public void SetOwnedMemory(byte[] arrayPoolBuffer)
57+
{
58+
_memoryOwner = arrayPoolBuffer;
59+
AvailableMemory = arrayPoolBuffer;
60+
}
61+
62+
public void SetUnownedMemory(Memory<byte> memory)
63+
{
64+
AvailableMemory = memory;
65+
}
66+
67+
public void ResetMemory()
68+
{
69+
if (_memoryOwner is IMemoryOwner<byte> owner)
70+
{
71+
owner.Dispose();
72+
}
73+
else if (_memoryOwner is byte[] array)
74+
{
75+
ArrayPool<byte>.Shared.Return(array);
76+
}
77+
78+
// Order of below field clears is significant as it clears in a sequential order
79+
// https://github.com/dotnet/corefx/pull/35256#issuecomment-462800477
80+
Next = null;
81+
RunningIndex = 0;
82+
Memory = default;
83+
_memoryOwner = null;
84+
_next = null;
85+
_end = 0;
86+
AvailableMemory = default;
87+
}
88+
89+
// Exposed for testing
90+
internal object MemoryOwner => _memoryOwner;
91+
92+
public Memory<byte> AvailableMemory { get; private set; }
93+
94+
public int Length => End;
95+
96+
public int WritableBytes
97+
{
98+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
99+
get => AvailableMemory.Length - End;
100+
}
101+
102+
public void SetNext(BufferSegment segment)
103+
{
104+
Debug.Assert(segment != null);
105+
Debug.Assert(Next == null);
106+
107+
NextSegment = segment;
108+
109+
segment = this;
110+
111+
while (segment.Next != null)
112+
{
113+
segment.NextSegment.RunningIndex = segment.RunningIndex + segment.Length;
114+
segment = segment.NextSegment;
115+
}
116+
}
117+
118+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
119+
internal static long GetLength(BufferSegment startSegment, int startIndex, BufferSegment endSegment, int endIndex)
120+
{
121+
return (endSegment.RunningIndex + (uint)endIndex) - (startSegment.RunningIndex + (uint)startIndex);
122+
}
123+
124+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
125+
internal static long GetLength(long startPosition, BufferSegment endSegment, int endIndex)
126+
{
127+
return (endSegment.RunningIndex + (uint)endIndex) - startPosition;
128+
}
129+
}
130+
}

0 commit comments

Comments
 (0)