-
-
Notifications
You must be signed in to change notification settings - Fork 978
Expand file tree
/
Copy pathSftpFileReader.cs
More file actions
484 lines (408 loc) · 17.7 KB
/
SftpFileReader.cs
File metadata and controls
484 lines (408 loc) · 17.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Runtime.ExceptionServices;
using System.Threading;
using Renci.SshNet.Abstractions;
using Renci.SshNet.Common;
namespace Renci.SshNet.Sftp
{
internal sealed class SftpFileReader : ISftpFileReader
{
private const int ReadAheadWaitTimeoutInMilliseconds = 1000;
private readonly byte[] _handle;
private readonly ISftpSession _sftpSession;
private readonly uint _chunkSize;
private readonly SemaphoreSlim _semaphore;
private readonly object _readLock;
private readonly ManualResetEvent _disposingWaitHandle;
private readonly ManualResetEvent _readAheadCompleted;
private readonly Dictionary<int, BufferedRead> _queue;
private readonly WaitHandle[] _waitHandles;
/// <summary>
/// Holds the size of the file, when available.
/// </summary>
private readonly long? _fileSize;
private ulong _offset;
private int _readAheadChunkIndex;
private ulong _readAheadOffset;
private int _nextChunkIndex;
/// <summary>
/// Holds a value indicating whether EOF has already been signaled by the SSH server.
/// </summary>
private bool _endOfFileReceived;
/// <summary>
/// Holds a value indicating whether the client has read up to the end of the file.
/// </summary>
private bool _isEndOfFileRead;
private bool _disposingOrDisposed;
private Exception _exception;
/// <summary>
/// Initializes a new instance of the <see cref="SftpFileReader"/> class with the specified handle,
/// <see cref="ISftpSession"/> and the maximum number of pending reads.
/// </summary>
/// <param name="handle">The file handle.</param>
/// <param name="sftpSession">The SFT session.</param>
/// <param name="chunkSize">The size of a individual read-ahead chunk.</param>
/// <param name="maxPendingReads">The maximum number of pending reads.</param>
/// <param name="fileSize">The size of the file, if known; otherwise, <see langword="null"/>.</param>
/// <param name="offset">The offset to resume from.</param>
public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize, ulong offset = 0)
{
_handle = handle;
_sftpSession = sftpSession;
_chunkSize = chunkSize;
_fileSize = fileSize;
_semaphore = new SemaphoreSlim(maxPendingReads);
_queue = new Dictionary<int, BufferedRead>(maxPendingReads);
_readLock = new object();
_readAheadCompleted = new ManualResetEvent(initialState: false);
_disposingWaitHandle = new ManualResetEvent(initialState: false);
_waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle);
// When resuming a download (offset > 0), set the initial offset of the remote file to
// the same offset as the local output file. Read-ahead also starts at the same offset.
_offset = offset;
_readAheadOffset = offset;
StartReadAhead();
}
public byte[] Read()
{
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(_disposingOrDisposed, this);
#else
if (_disposingOrDisposed)
{
throw new ObjectDisposedException(GetType().FullName);
}
#endif // NET7_0_OR_GREATER
if (_exception is not null)
{
ExceptionDispatchInfo.Capture(_exception).Throw();
}
if (_isEndOfFileRead)
{
throw new SshException("Attempting to read beyond the end of the file.");
}
BufferedRead nextChunk;
lock (_readLock)
{
// wait until either the next chunk is available, an exception has occurred or the current
// instance is already disposed
while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && _exception is null)
{
_ = Monitor.Wait(_readLock);
}
// throw when exception occured in read-ahead, or the current instance is already disposed
if (_exception != null)
{
ExceptionDispatchInfo.Capture(_exception).Throw();
}
var data = nextChunk.Data;
if (nextChunk.Offset == _offset)
{
// have we reached EOF?
if (data.Length == 0)
{
// PERF: we do not bother updating all of the internal state when we've reached EOF
_isEndOfFileRead = true;
}
else
{
// remove processed chunk
_ = _queue.Remove(_nextChunkIndex);
// update offset
_offset += (ulong)data.Length;
// move to next chunk
_nextChunkIndex++;
}
// unblock wait in read-ahead
_ = _semaphore.Release();
return data;
}
// When we received an EOF for the next chunk and the size of the file is known, then
// we only complete the current chunk if we haven't already read up to the file size.
// This way we save an extra round-trip to the server.
if (data.Length == 0 && _fileSize.HasValue && _offset == (ulong)_fileSize.Value)
{
// avoid future reads
_isEndOfFileRead = true;
// unblock wait in read-ahead
_ = _semaphore.Release();
// signal EOF to caller
return nextChunk.Data;
}
}
/*
* When the server returned less bytes than requested (for the previous chunk)
* we'll synchronously request the remaining data.
*
* Due to the optimization above, we'll only get here in one of the following cases:
* - an EOF situation for files for which we were unable to obtain the file size
* - fewer bytes that requested were returned
*
* According to the SSH specification, this last case should never happen for normal
* disk files (but can happen for device files). In practice, OpenSSH - for example -
* returns less bytes than requested when requesting more than 64 KB.
*
* Important:
* To avoid a deadlock, this read must be done outside of the read lock.
*/
var bytesToCatchUp = nextChunk.Offset - _offset;
/*
* TODO: break loop and interrupt blocking wait in case of exception
*/
var read = _sftpSession.RequestRead(_handle, _offset, (uint)bytesToCatchUp);
if (read.Length == 0)
{
// process data in read lock to avoid ObjectDisposedException while releasing semaphore
lock (_readLock)
{
// a zero-length (EOF) response is only valid for the read-back when EOF has
// been signaled for the next read-ahead chunk
if (nextChunk.Data.Length == 0)
{
_isEndOfFileRead = true;
// ensure we've not yet disposed the current instance
if (!_disposingOrDisposed)
{
// unblock wait in read-ahead
_ = _semaphore.Release();
}
// signal EOF to caller
return read;
}
// move reader to error state
_exception = new SshException("Unexpectedly reached end of file.");
// ensure we've not yet disposed the current instance
if (!_disposingOrDisposed)
{
// unblock wait in read-ahead
_ = _semaphore.Release();
}
// notify caller of error
throw _exception;
}
}
_offset += (uint)read.Length;
return read;
}
~SftpFileReader()
{
Dispose(disposing: false);
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Releases unmanaged and - optionally - managed resources.
/// </summary>
/// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>
private void Dispose(bool disposing)
{
if (_disposingOrDisposed)
{
return;
}
// transition to disposing state
_disposingOrDisposed = true;
if (disposing)
{
// record exception to break prevent future Read()
_exception = new ObjectDisposedException(GetType().FullName);
// signal that we're disposing to interrupt wait in read-ahead
_ = _disposingWaitHandle.Set();
// wait until the read-ahead thread has completed
_ = _readAheadCompleted.WaitOne();
// unblock the Read()
lock (_readLock)
{
// dispose semaphore in read lock to ensure we don't run into an ObjectDisposedException
// in Read()
_semaphore.Dispose();
// awake Read
Monitor.PulseAll(_readLock);
}
_readAheadCompleted.Dispose();
_disposingWaitHandle.Dispose();
if (_sftpSession.IsOpen)
{
try
{
var closeAsyncResult = _sftpSession.BeginClose(_handle, callback: null, state: null);
_sftpSession.EndClose(closeAsyncResult);
}
catch (Exception ex)
{
DiagnosticAbstraction.Log("Failure closing handle: " + ex);
}
}
}
}
private void StartReadAhead()
{
ThreadAbstraction.ExecuteThread(() =>
{
while (!_endOfFileReceived && _exception is null)
{
// check if we should continue with the read-ahead loop
// note that the EOF and exception check are not included
// in this check as they do not require Read() to be
// unblocked (or have already done this)
if (!ContinueReadAhead())
{
// unblock the Read()
lock (_readLock)
{
Monitor.PulseAll(_readLock);
}
// break the read-ahead loop
break;
}
// attempt to obtain the semaphore; this may time out when all semaphores are
// in use due to pending read-aheads (which in turn can happen when the server
// is slow to respond or when the session is broken)
if (!_semaphore.Wait(ReadAheadWaitTimeoutInMilliseconds))
{
// re-evaluate whether an exception occurred, and - if not - wait again
continue;
}
// don't bother reading any more chunks if we received EOF, an exception has occurred
// or the current instance is disposed
if (_endOfFileReceived || _exception != null)
{
break;
}
// start reading next chunk
var bufferedRead = new BufferedRead(_readAheadChunkIndex, _readAheadOffset);
try
{
// even if we know the size of the file and have read up to EOF, we still want
// to keep reading (ahead) until we receive zero bytes from the remote host as
// we do not want to rely purely on the reported file size
//
// if the offset of the read-ahead chunk is greater than that file size, then
// we can expect to be reading the last (zero-byte) chunk and switch to synchronous
// mode to avoid having multiple read-aheads that read beyond EOF
if (_fileSize != null && (long)_readAheadOffset > _fileSize.Value)
{
var asyncResult = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, callback: null, bufferedRead);
var data = _sftpSession.EndRead(asyncResult);
ReadCompletedCore(bufferedRead, data);
}
else
{
_ = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, ReadCompleted, bufferedRead);
}
}
catch (Exception ex)
{
HandleFailure(ex);
break;
}
// advance read-ahead offset
_readAheadOffset += _chunkSize;
// increment index of read-ahead chunk
_readAheadChunkIndex++;
}
_ = _readAheadCompleted.Set();
});
}
/// <summary>
/// Returns a value indicating whether the read-ahead loop should be continued.
/// </summary>
/// <returns>
/// <see langword="true"/> if the read-ahead loop should be continued; otherwise, <see langword="false"/>.
/// </returns>
private bool ContinueReadAhead()
{
try
{
var waitResult = _sftpSession.WaitAny(_waitHandles, _sftpSession.OperationTimeout);
switch (waitResult)
{
case 0: // disposing
return false;
case 1: // semaphore available
return true;
default:
throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", waitResult));
}
}
catch (Exception ex)
{
_ = Interlocked.CompareExchange(ref _exception, ex, comparand: null);
return false;
}
}
private void ReadCompleted(IAsyncResult result)
{
if (_disposingOrDisposed)
{
// skip further processing if we're disposing the current instance
// to avoid accessing disposed members
return;
}
var readAsyncResult = (SftpReadAsyncResult)result;
byte[] data;
try
{
data = readAsyncResult.EndInvoke();
}
catch (Exception ex)
{
HandleFailure(ex);
return;
}
// a read that completes with a zero-byte result signals EOF
// but there may be pending reads before that read
var bufferedRead = (BufferedRead)readAsyncResult.AsyncState;
ReadCompletedCore(bufferedRead, data);
}
private void ReadCompletedCore(BufferedRead bufferedRead, byte[] data)
{
bufferedRead.Complete(data);
lock (_readLock)
{
// add item to queue
_queue.Add(bufferedRead.ChunkIndex, bufferedRead);
// Signal that a chunk has been read or EOF has been reached.
// In both cases, Read() will eventually also unblock the "read-ahead" thread.
Monitor.PulseAll(_readLock);
}
// check if server signaled EOF
if (data.Length == 0)
{
// set a flag to stop read-aheads
_endOfFileReceived = true;
}
}
private void HandleFailure(Exception cause)
{
_ = Interlocked.CompareExchange(ref _exception, cause, comparand: null);
// unblock read-ahead
_ = _semaphore.Release();
// unblock Read()
lock (_readLock)
{
Monitor.PulseAll(_readLock);
}
}
internal sealed class BufferedRead
{
public int ChunkIndex { get; }
public byte[] Data { get; private set; }
public ulong Offset { get; }
public BufferedRead(int chunkIndex, ulong offset)
{
ChunkIndex = chunkIndex;
Offset = offset;
}
public void Complete(byte[] data)
{
Data = data;
}
}
}
}