Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 62 additions & 23 deletions src/Renci.SshNet/SftpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ public void DownloadFile(string path, Stream output, Action<ulong>? downloadCall

if (downloadCallback != null)
{
downloadProgress = new Progress<DownloadFileProgressReport>(r => downloadCallback(r.TotalBytesDownloaded));
downloadProgress = new SynchronousProgress<DownloadFileProgressReport>(r => downloadCallback(r.TotalBytesDownloaded));
}

InternalDownloadFile(
Expand Down Expand Up @@ -934,7 +934,7 @@ public Task DownloadFileAsync(string path, Stream output, IProgress<DownloadFile
path,
output,
asyncResult: null,
downloadProgress: downloadProgress,
downloadProgress,
isAsync: true,
cancellationToken);
}
Expand Down Expand Up @@ -1011,7 +1011,11 @@ public IAsyncResult BeginDownloadFile(string path, Stream output, AsyncCallback?

if (downloadCallback != null)
{
downloadProgress = new Progress<DownloadFileProgressReport>(r => downloadCallback(r.TotalBytesDownloaded));
// The System.Progress<T> ctor captures the current synchronization context
// and posts the progress reports to it. For back-compat with previous
// versions which always posted the callback to the threadpool regardless of
// sync context, we use a custom IProgress<T> impl.
downloadProgress = new ThreadPoolProgress<DownloadFileProgressReport>(r => downloadCallback(r.TotalBytesDownloaded));
}

var asyncResult = new SftpDownloadAsyncResult(asyncCallback, state);
Expand Down Expand Up @@ -1089,7 +1093,7 @@ public void UploadFile(Stream input, string path, bool canOverride, Action<ulong

if (uploadCallback != null)
{
uploadProgress = new Progress<UploadFileProgressReport>(r => uploadCallback(r.TotalBytesUploaded));
uploadProgress = new SynchronousProgress<UploadFileProgressReport>(r => uploadCallback(r.TotalBytesUploaded));
}
Comment on lines 1094 to 1097

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point


InternalUploadFile(
Expand Down Expand Up @@ -1273,7 +1277,11 @@ public IAsyncResult BeginUploadFile(Stream input, string path, bool canOverride,

if (uploadCallback != null)
{
uploadProgress = new Progress<UploadFileProgressReport>(r => uploadCallback(r.TotalBytesUploaded));
// The System.Progress<T> ctor captures the current synchronization context
// and posts the progress reports to it. For back-compat with previous
// versions which always posted the callback to the threadpool regardless of
// sync context, we use a custom IProgress<T> impl.
uploadProgress = new ThreadPoolProgress<UploadFileProgressReport>(r => uploadCallback(r.TotalBytesUploaded));
}

var asyncResult = new SftpUploadAsyncResult(asyncCallback, state);
Expand Down Expand Up @@ -2417,16 +2425,10 @@ private async Task InternalDownloadFile(

asyncResult?.Update(totalBytesRead);

if (downloadProgress is not null)
downloadProgress?.Report(new DownloadFileProgressReport()
{
// Copy offset to ensure it's not modified between now and execution of callback
var report = new DownloadFileProgressReport()
{
TotalBytesDownloaded = totalBytesRead,
};

downloadProgress.Report(report);
}
TotalBytesDownloaded = totalBytesRead
});
}
}
finally
Expand Down Expand Up @@ -2536,16 +2538,10 @@ private async Task InternalUploadFile(

asyncResult?.Update(writtenBytes);

// Call callback to report number of bytes written
if (uploadProgress is not null)
uploadProgress?.Report(new UploadFileProgressReport()
{
UploadFileProgressReport report = new()
{
TotalBytesUploaded = writtenBytes,
};

uploadProgress.Report(report);
}
TotalBytesUploaded = writtenBytes
});
}
finally
{
Expand Down Expand Up @@ -2652,5 +2648,48 @@ private ISftpSession CreateAndConnectToSftpSession()
throw;
}
}

/// <summary>
/// An <see cref="IProgress{T}"/> implementation that posts callbacks to the threadpool.
/// </summary>
private sealed class ThreadPoolProgress<T> : IProgress<T>
{
private readonly Action<T> _handler;

public ThreadPoolProgress(Action<T> handler)
{
Debug.Assert(handler != null);
_handler = handler!;
}

void IProgress<T>.Report(T value)
{
_ = ThreadPool.QueueUserWorkItem(static state =>
{
var (handler, value) = ((Action<T>, T))state!;
handler(value);
},
(_handler, value));
}
}

/// <summary>
/// An <see cref="IProgress{T}"/> implementation that invokes callbacks synchronously.
/// </summary>
private sealed class SynchronousProgress<T> : IProgress<T>
{
private readonly Action<T> _handler;

public SynchronousProgress(Action<T> handler)
{
Debug.Assert(handler != null);
_handler = handler!;
}

void IProgress<T>.Report(T value)
{
_handler.Invoke(value);
}
Comment on lines +2652 to +2692
}
}
}