Skip to content

Commit fdbc4d3

Browse files
Fix a few issues with PipeStream (#1399)
Apply similar treatment to PipeStream as #1322 did to ShellStream PipeStream now behaves much more Stream-like. In particular, it performs partial reads (instead of blocking until a certain amount of data is available), blocks until data is available (instead of returning 0 prematurely) and removes the Stream-unlike properties `BlockLastReadBuffer` and `MaxBufferLength`. Sadly I gave up trying to make a benchmark compatible with all the quirks of the previous implementation, but a dumb throughput test (reading and writing simultaneously) shows about 5.2GB/s with this implementation compared to 140MB/s previously. Some cleanup of its usage in the library followed. Co-authored-by: Wojciech Nagórski <wojtpl2@gmail.com>
1 parent 49aed6f commit fdbc4d3

16 files changed

+331
-691
lines changed

src/Renci.SshNet/Common/PipeStream.cs

+118-295
Large diffs are not rendered by default.

src/Renci.SshNet/ScpClient.cs

+6
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ public void Upload(Stream source, string path)
262262
using (var channel = Session.CreateChannelSession())
263263
{
264264
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
265+
channel.Closed += (sender, e) => input.Dispose();
265266
channel.Open();
266267

267268
// Pass only the directory part of the path to the server, and use the (hidden) -d option to signal
@@ -307,6 +308,7 @@ public void Upload(FileInfo fileInfo, string path)
307308
using (var channel = Session.CreateChannelSession())
308309
{
309310
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
311+
channel.Closed += (sender, e) => input.Dispose();
310312
channel.Open();
311313

312314
// Pass only the directory part of the path to the server, and use the (hidden) -d option to signal
@@ -364,6 +366,7 @@ public void Upload(DirectoryInfo directoryInfo, string path)
364366
using (var channel = Session.CreateChannelSession())
365367
{
366368
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
369+
channel.Closed += (sender, e) => input.Dispose();
367370
channel.Open();
368371

369372
// start copy with the following options:
@@ -413,6 +416,7 @@ public void Download(string filename, FileInfo fileInfo)
413416
using (var channel = Session.CreateChannelSession())
414417
{
415418
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
419+
channel.Closed += (sender, e) => input.Dispose();
416420
channel.Open();
417421

418422
// Send channel command request
@@ -459,6 +463,7 @@ public void Download(string directoryName, DirectoryInfo directoryInfo)
459463
using (var channel = Session.CreateChannelSession())
460464
{
461465
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
466+
channel.Closed += (sender, e) => input.Dispose();
462467
channel.Open();
463468

464469
// Send channel command request
@@ -505,6 +510,7 @@ public void Download(string filename, Stream destination)
505510
using (var channel = Session.CreateChannelSession())
506511
{
507512
channel.DataReceived += (sender, e) => input.Write(e.Data, 0, e.Data.Length);
513+
channel.Closed += (sender, e) => input.Dispose();
508514
channel.Open();
509515

510516
// Send channel command request

src/Renci.SshNet/ShellStream.cs

+1-3
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,7 @@ public override bool CanWrite
182182
get { return !_disposed; }
183183
}
184184

185-
/// <summary>
186-
/// This method does nothing.
187-
/// </summary>
185+
/// <inheritdoc/>
188186
public override void Flush()
189187
{
190188
ThrowIfDisposed();

src/Renci.SshNet/SshCommand.cs

+47-60
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ public class SshCommand : IDisposable
2828
private EventWaitHandle _sessionErrorOccuredWaitHandle;
2929
private EventWaitHandle _commandCancelledWaitHandle;
3030
private Exception _exception;
31-
private StringBuilder _result;
32-
private StringBuilder _error;
31+
private string _result;
32+
private string _error;
3333
private bool _hasError;
3434
private bool _isDisposed;
3535
private bool _isCancelled;
@@ -109,21 +109,22 @@ public string Result
109109
{
110110
get
111111
{
112-
_result ??= new StringBuilder();
112+
if (_result is not null)
113+
{
114+
return _result;
115+
}
113116

114-
if (OutputStream != null && OutputStream.Length > 0)
117+
if (OutputStream is null)
115118
{
116-
using (var sr = new StreamReader(OutputStream,
117-
_encoding,
118-
detectEncodingFromByteOrderMarks: true,
119-
bufferSize: 1024,
120-
leaveOpen: true))
121-
{
122-
_ = _result.Append(sr.ReadToEnd());
123-
}
119+
return string.Empty;
124120
}
125121

126-
return _result.ToString();
122+
using (var sr = new StreamReader(OutputStream,
123+
_encoding,
124+
detectEncodingFromByteOrderMarks: true))
125+
{
126+
return _result = sr.ReadToEnd();
127+
}
127128
}
128129
}
129130

@@ -134,26 +135,22 @@ public string Error
134135
{
135136
get
136137
{
137-
if (_hasError)
138+
if (_error is not null)
139+
{
140+
return _error;
141+
}
142+
143+
if (ExtendedOutputStream is null || !_hasError)
138144
{
139-
_error ??= new StringBuilder();
140-
141-
if (ExtendedOutputStream != null && ExtendedOutputStream.Length > 0)
142-
{
143-
using (var sr = new StreamReader(ExtendedOutputStream,
144-
_encoding,
145-
detectEncodingFromByteOrderMarks: true,
146-
bufferSize: 1024,
147-
leaveOpen: true))
148-
{
149-
_ = _error.Append(sr.ReadToEnd());
150-
}
151-
}
152-
153-
return _error.ToString();
145+
return string.Empty;
154146
}
155147

156-
return string.Empty;
148+
using (var sr = new StreamReader(ExtendedOutputStream,
149+
_encoding,
150+
detectEncodingFromByteOrderMarks: true))
151+
{
152+
return _error = sr.ReadToEnd();
153+
}
157154
}
158155
}
159156

@@ -265,26 +262,16 @@ public IAsyncResult BeginExecute(AsyncCallback callback, object state)
265262
throw new ArgumentException("CommandText property is empty.");
266263
}
267264

268-
var outputStream = OutputStream;
269-
if (outputStream is not null)
270-
{
271-
outputStream.Dispose();
272-
OutputStream = null;
273-
}
274-
275-
var extendedOutputStream = ExtendedOutputStream;
276-
if (extendedOutputStream is not null)
277-
{
278-
extendedOutputStream.Dispose();
279-
ExtendedOutputStream = null;
280-
}
265+
OutputStream?.Dispose();
266+
ExtendedOutputStream?.Dispose();
281267

282268
// Initialize output streams
283269
OutputStream = new PipeStream();
284270
ExtendedOutputStream = new PipeStream();
285271

286272
_result = null;
287273
_error = null;
274+
_hasError = false;
288275
_callback = callback;
289276

290277
_channel = CreateChannel();
@@ -341,13 +328,21 @@ public string EndExecute(IAsyncResult asyncResult)
341328

342329
_inputStream?.Close();
343330

344-
// wait for operation to complete (or time out)
345-
WaitOnHandle(_asyncResult.AsyncWaitHandle);
331+
try
332+
{
333+
// wait for operation to complete (or time out)
334+
WaitOnHandle(_asyncResult.AsyncWaitHandle);
335+
}
336+
finally
337+
{
338+
UnsubscribeFromEventsAndDisposeChannel(_channel);
339+
_channel = null;
346340

347-
UnsubscribeFromEventsAndDisposeChannel(_channel);
348-
_channel = null;
341+
OutputStream?.Dispose();
342+
ExtendedOutputStream?.Dispose();
349343

350-
commandAsyncResult.EndCalled = true;
344+
commandAsyncResult.EndCalled = true;
345+
}
351346

352347
if (!_isCancelled)
353348
{
@@ -437,8 +432,8 @@ private void Session_ErrorOccured(object sender, ExceptionEventArgs e)
437432

438433
private void SetAsyncComplete()
439434
{
440-
OutputStream?.Flush();
441-
ExtendedOutputStream?.Flush();
435+
OutputStream?.Dispose();
436+
ExtendedOutputStream?.Dispose();
442437

443438
_asyncResult.IsCompleted = true;
444439

@@ -480,11 +475,7 @@ private void Channel_RequestReceived(object sender, ChannelRequestEventArgs e)
480475

481476
private void Channel_ExtendedDataReceived(object sender, ChannelExtendedDataEventArgs e)
482477
{
483-
if (ExtendedOutputStream != null)
484-
{
485-
ExtendedOutputStream.Write(e.Data, 0, e.Data.Length);
486-
ExtendedOutputStream.Flush();
487-
}
478+
ExtendedOutputStream?.Write(e.Data, 0, e.Data.Length);
488479

489480
if (e.DataTypeCode == 1)
490481
{
@@ -494,11 +485,7 @@ private void Channel_ExtendedDataReceived(object sender, ChannelExtendedDataEven
494485

495486
private void Channel_DataReceived(object sender, ChannelDataEventArgs e)
496487
{
497-
if (OutputStream != null)
498-
{
499-
OutputStream.Write(e.Data, 0, e.Data.Length);
500-
OutputStream.Flush();
501-
}
488+
OutputStream?.Write(e.Data, 0, e.Data.Length);
502489

503490
if (_asyncResult != null)
504491
{

test/Renci.SshNet.IntegrationBenchmarks/SshClientBenchmark.cs

+12
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,18 @@ public string RunCommand()
7272
return _sshClient!.RunCommand("echo $'test !@#$%^&*()_+{}:,./<>[];\\|'").Result;
7373
}
7474

75+
[Benchmark]
76+
public string RunBigCommand()
77+
{
78+
using var command = _sshClient!.CreateCommand("head -c 10000000 /dev/urandom | base64"); // 10MB of data please
79+
80+
var asyncResult = command.BeginExecute();
81+
82+
command.OutputStream.CopyTo(Stream.Null);
83+
84+
return command.EndExecute(asyncResult);
85+
}
86+
7587
[Benchmark]
7688
public string ShellStreamReadLine()
7789
{

test/Renci.SshNet.IntegrationTests/SshClientTests.cs

+22
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,28 @@ public void Echo_Command_with_all_characters()
2222
Assert.AreEqual("test !@#$%^&*()_+{}:,./<>[];\\|\n", response.Result);
2323
}
2424

25+
[TestMethod]
26+
public void Test_BigCommand()
27+
{
28+
using var command = _sshClient.CreateCommand("head -c 10000000 /dev/urandom | base64"); // 10MB of data please
29+
30+
var asyncResult = command.BeginExecute();
31+
32+
long totalBytesRead = 0;
33+
int bytesRead;
34+
byte[] buffer = new byte[4096];
35+
36+
while ((bytesRead = command.OutputStream.Read(buffer, 0, buffer.Length)) != 0)
37+
{
38+
totalBytesRead += bytesRead;
39+
}
40+
41+
var result = command.EndExecute(asyncResult);
42+
43+
Assert.AreEqual(13_508_775, totalBytesRead);
44+
Assert.AreEqual(0, result.Length);
45+
}
46+
2547
[TestMethod]
2648
public void Send_InputStream_to_Command()
2749
{

test/Renci.SshNet.IntegrationTests/SshTests.cs

+12-12
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void Ssh_CreateShell()
172172
}
173173

174174
[TestMethod]
175-
public void Ssh_Command_IntermittendOutput_EndExecute()
175+
public void Ssh_Command_IntermittentOutput_EndExecute()
176176
{
177177
const string remoteFile = "/home/sshnet/test.sh";
178178

@@ -229,16 +229,8 @@ public void Ssh_Command_IntermittendOutput_EndExecute()
229229
}
230230
}
231231

232-
/// <summary>
233-
/// Ignored for now, because:
234-
/// * OutputStream.Read(...) does not block when no data is available
235-
/// * SshCommand.(Begin)Execute consumes *OutputStream*, advancing its position.
236-
///
237-
/// https://github.com/sshnet/SSH.NET/issues/650
238-
/// </summary>
239232
[TestMethod]
240-
[Ignore]
241-
public void Ssh_Command_IntermittendOutput_OutputStream()
233+
public void Ssh_Command_IntermittentOutput_OutputStream()
242234
{
243235
const string remoteFile = "/home/sshnet/test.sh";
244236

@@ -297,8 +289,16 @@ public void Ssh_Command_IntermittendOutput_OutputStream()
297289

298290
var actualResult = command.EndExecute(asyncResult);
299291

300-
Assert.AreEqual(expectedResult, actualResult);
301-
Assert.AreEqual(expectedResult, command.Result);
292+
// command.Result (also returned from EndExecute) consumes OutputStream,
293+
// which we've already read from, so Result will be empty.
294+
// TODO consider the suggested changes in https://github.com/sshnet/SSH.NET/issues/650
295+
296+
//Assert.AreEqual(expectedResult, actualResult);
297+
//Assert.AreEqual(expectedResult, command.Result);
298+
299+
// For now just assert the current behaviour.
300+
Assert.AreEqual(0, actualResult.Length);
301+
Assert.AreEqual(0, command.Result.Length);
302302
}
303303
}
304304
finally

test/Renci.SshNet.Tests/.editorconfig

+9-1
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ dotnet_diagnostic.MA0110.severity = silent
305305
# https://github.com/meziantou/Meziantou.Analyzer/blob/main/docs/Rules/MA0026.md
306306
dotnet_diagnostic.MA0026.severity = silent
307307

308+
# MA0042: Do not use blocking calls in an async method
309+
# https://github.com/meziantou/Meziantou.Analyzer/blob/main/docs/Rules/MA0042.md
310+
dotnet_diagnostic.MA0042.severity = silent
311+
308312
#### .NET Compiler Platform analysers rules ####
309313

310314
# CA1031: Do not catch general exception types
@@ -397,4 +401,8 @@ dotnet_diagnostic.IDE0032.severity = silent
397401

398402
# CA1812: Avoid uninstantiated internal classes
399403
# https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca1812
400-
dotnet_diagnostic.CA1812.severity = silent
404+
dotnet_diagnostic.CA1812.severity = silent
405+
406+
# CA1849: Call async methods when in an async method
407+
# https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/CA1849
408+
dotnet_diagnostic.CA1849.severity = silent

0 commit comments

Comments
 (0)