如何在不重新启动进程的情况下重新启动与发送重置数据包的FTP服务器的通信



我们有一个(长期运行的(Windows服务,它使用FtpWebRequest定期与嵌入第三方设备上的FTP服务器通信。这在大多数情况下都很有效,但有时我们的服务会停止与设备的通信,但一旦您重新启动我们的服务,一切都会重新开始工作。

我花了一些时间用MCVE(包括在下面(调试它,并通过Wireshark发现,一旦通信开始失败,就没有网络流量流向外部FTP服务器(Wireshark中根本没有数据包流向该IP(。如果我尝试从同一台机器上的另一个应用程序(如Windows资源管理器(连接到同一个FTP,一切都很好。

在一切停止工作之前查看数据包,我看到来自设备的带有重置(RST(标志集的数据包,所以我怀疑这可能是问题所在。一旦我们运行服务的计算机上的网络堆栈的某个部分接收到重置数据包,它就会执行本文TCP重置部分中所述的操作,并阻止从我们的进程到设备的所有进一步通信。

据我所知,我们与设备的通信方式没有什么问题,而且大多数时候,完全相同的代码运行得很好。重现该问题的最简单方法(请参阅下面的MCVE(似乎是同时与FTP建立大量单独的连接,因此我怀疑当同时与FTP进行大量连接(并非全部由我们(时可能会出现该问题。

问题是,如果我们重新启动流程,一切都会正常进行,我们确实需要重新建立与设备的通信。有没有一种方法可以在不重新启动整个过程的情况下(在经过适当的时间后(重新建立通信?

不幸的是,FTP服务器嵌入在一个相当旧的第三方设备上运行,该设备不太可能更新以解决此问题,即使是这样,我们仍然希望/需要与所有已经在现场的设备进行通信,而不需要我们的客户在可能的情况下更新它们。

我们知道的选项:

  1. 使用命令行FTP客户端,例如内置在Windows中的客户端。

    • 这样做的一个缺点是,我们需要列出一个目录中的所有文件,然后只下载其中的一部分,所以我们必须编写逻辑来解析对此的响应
    • 我们还必须将文件下载到临时文件,而不是像现在这样下载到流中
  2. 创建另一个应用程序来处理FTP通信部分,我们在每个请求完成后将其拆除。

    • 这里的主要缺点是进程间通信有点麻烦

MCVE

这在LINQPad中运行,并且相当可靠地再现了问题。通常,前几个任务成功,然后出现问题,然后所有任务开始超时。在Wireshark中,我可以看到我的电脑和设备之间没有通信。

如果我再次运行脚本,则所有任务都会失败,直到我重新启动LINQPad或执行"取消所有线程并重置",从而重新启动LINQPad用于运行查询的进程。如果我做了这两件事中的任何一件,那么我们就回到了成功的前几项任务。

async Task Main() {
    var tasks = new List<Task>();
    var numberOfBatches = 3;
    var numberOfTasksPerBatch = 10;
    foreach (var batchNumber in Enumerable.Range(1, numberOfBatches)) {
        $"Starting tasks in batch {batchNumber}".Dump();
        tasks.AddRange(Enumerable.Range(1, numberOfTasksPerBatch).Select(taskNumber => Connect(batchNumber, taskNumber)));
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    await Task.WhenAll(tasks);
}
async Task Connect(int batchNumber, int taskNumber) {
    try {
        var client = new FtpClient();
        var result = await client.GetFileAsync(new Uri("ftp://192.168.0.191/logging/20140620.csv"), TimeSpan.FromSeconds(10));
        result.Count.Dump($"Task {taskNumber} in batch {batchNumber} succeeded");
    } catch (Exception e) {
        e.Dump($"Task {taskNumber} in batch {batchNumber} failed");
    }
}
public class FtpClient {
    public virtual async Task<ImmutableList<Byte>> GetFileAsync(Uri fileUri, TimeSpan timeout) {
        if (fileUri == null) {
            throw new ArgumentNullException(nameof(fileUri));
        }
        FtpWebRequest ftpWebRequest = (FtpWebRequest)WebRequest.Create(fileUri);
        ftpWebRequest.Method = WebRequestMethods.Ftp.DownloadFile;
        ftpWebRequest.UseBinary = true;
        ftpWebRequest.KeepAlive = false;
        using (var source = new CancellationTokenSource(timeout)) {
            try {
                using (var response = (FtpWebResponse)await ftpWebRequest.GetResponseAsync()
                    .WithWaitCancellation(source.Token)) {
                    using (Stream ftpStream = response.GetResponseStream()) {
                        if (ftpStream == null) {
                            throw new InvalidOperationException("No response stream");
                        }
                        using (var dataStream = new MemoryStream()) {
                            await ftpStream.CopyToAsync(dataStream, 4096, source.Token)
                                .WithWaitCancellation(source.Token);
                            return dataStream.ToArray().ToImmutableList();
                        }
                    }
                }
            } catch (OperationCanceledException) {
                throw new WebException(
                    String.Format("Operation timed out after {0} seconds.", timeout.TotalSeconds),
                    WebExceptionStatus.Timeout);
            } finally {
                ftpWebRequest.Abort();
            }
        }
    }
}
public static class TaskCancellationExtensions {
    /// http://stackoverflow.com/a/14524565/1512
    public static async Task<T> WithWaitCancellation<T>(
        this Task<T> task,
        CancellationToken cancellationToken) {
        // The task completion source. 
        var tcs = new TaskCompletionSource<Boolean>();
        // Register with the cancellation token.
        using (cancellationToken.Register(
            s => ((TaskCompletionSource<Boolean>)s).TrySetResult(true),
            tcs)) {
            // If the task waited on is the cancellation token...
            if (task != await Task.WhenAny(task, tcs.Task)) {
                throw new OperationCanceledException(cancellationToken);
            }
        }
        // Wait for one or the other to complete.
        return await task;
    }
    /// http://stackoverflow.com/a/14524565/1512
    public static async Task WithWaitCancellation(
        this Task task,
        CancellationToken cancellationToken) {
        // The task completion source. 
        var tcs = new TaskCompletionSource<Boolean>();
        // Register with the cancellation token.
        using (cancellationToken.Register(
            s => ((TaskCompletionSource<Boolean>)s).TrySetResult(true),
            tcs)) {
            // If the task waited on is the cancellation token...
            if (task != await Task.WhenAny(task, tcs.Task)) {
                throw new OperationCanceledException(cancellationToken);
            }
        }
        // Wait for one or the other to complete.
        await task;
    }
}

这让我想起了旧的(?(IE行为,即使网络在N次尝试失败后返回,也不会重新加载页面。

您应该尝试将FtpWebRequest的缓存策略设置为BypassCache

HttpRequestCachePolicy bypassPolicy = new HttpRequestCachePolicy(
    HttpRequestCacheLevel.BypassCache
);
ftpWebRequest.CachePolicy = bypassPolicy;

在设置CCD_ 3。

我在尝试连接到没有EnableSsl=true的ftps服务器时遇到了同样的问题。Wireshark显示RST命令,连接将失败两次,然后不会有更多请求离开网络,从而导致超时异常,即使在设置EnableSsl=true之后也是如此。

我发现设置ConnectionGroupName可以重置连接并使用新端口。例如:

request.ConnectionGroupName = Guid.NewGuid();

但是,使用此方法时要注意端口耗尽,请参阅https://learn.microsoft.com/en-us/troubleshoot/dotnet/framework/ports-run-out-use-connectiongroupname

最新更新