我有一个FTP客户端,我想让它与FTP服务器保持连接,除非(比如)一分钟过去了,没有任何活动。我想使用可观察量来做到这一点。
下面是一个非常笨拙的 Linqpad 脚本,演示了这个概念:
async Task Main()
{
var client = new Client();
client.Connect();
var debounce = new Subject<int>();
debounce
.Throttle(TimeSpan.FromSeconds(1))
.Subscribe(eventNumber => client.Disconnect(eventNumber));
// Something uses the FTP client
debounce.OnNext(1);
await Task.Delay(200);
// Something else uses the FTP client
debounce.OnNext(2);
await Task.Delay(300);
// No activity, the client will disconnect
await Task.Delay(1000);
}
public class Client
{
public void Connect() => Console.WriteLine("Connected");
public void Disconnect(int eventNumber) => Console.WriteLine($"Disconnected: {eventNumber}");
}
这非常有效 - 客户端在事件"2"后断开连接。
问题:有没有更好的方法?或者更准确地说,有没有更好的方法可以在不使用Subject
的情况下做到这一点?
编辑
这是一个更充实的类版本 - 实际上,它订阅了一个可观察量,它会告诉它一些需要下载的文件;如果一段时间没有文件通过,那么我希望客户端断开连接。
public class MyClassThatDownloadsViaFtp
{
private IObserver<Unit> _debouncer;
private FtpClient _client;
public MyClassThatDownloadsViaFtp(IObservable<FileToDownload> filesToDownloadViaFtp)
{
filesToDownloadViaFtp.Subscribe(DownloadFileViaFtp);
// Disconnect after a minute of activity
_debouncer = new Subject<Unit>();
_debouncer
.Throttle(TimeSpan.FromMinutes(1))
.Subscribe(_ => DisconnectFtpClient());
}
public void DownloadFileViaFtp(FileToDownload file)
{
if (_client == null) _client = ConnectFtpClient();
// Signal that the client is doing some work to prevent disconnect
_debouncer.OnNext(Unit.Default);
_client.Download(file.PathOnFtpServer);
}
// implementation irrelivent
private FtpClient ConnectFtpClient() => new FtpClient();
private FtpClient DisconnectFtpClient() => _client = null;
}
我发现,由于我有一个源流,因此限制它以达到相同的效果可能更容易(如下所示);但是,我仍然想知道在没有可以限制的源流的情况下执行此操作的最佳方法。
public class MyClassThatDownloadsViaFtp
{
private FtpClient _client;
public MyClassThatDownloadsViaFtp(IObservable<FileToDownload> filesToDownloadViaFtp)
{
filesToDownloadViaFtp
.Select(DownloadFileViaFtp)
.Throttle(TimeSpan.FromMinutes(1))
.Subscribe(_ => DisconnectFtpClient());
}
public Unit DownloadFileViaFtp(FileToDownload file)
{
if (_client == null) _client = ConnectFtpClient();
_client.Download(file.PathOnFtpServer);
return Unit.Default;
}
// implementation irrelivent
private FtpClient ConnectFtpClient() => new FtpClient();
private FtpClient DisconnectFtpClient() => _client = null;
}
你基本上用这个回答了你的问题:
public MyClassThatDownloadsViaFtp(IObservable<FileToDownload> filesToDownloadViaFtp)
{
filesToDownloadViaFtp
.Select(DownloadFileViaFtp)
.Throttle(TimeSpan.FromMinutes(1))
.Subscribe(_ => DisconnectFtpClient());
}
如果您没有像filesToDownloadViaFtp
这样方便的流,请从Observable.Create
或Observable.FromEvent
或Observable.FromEventPattern
等创建一个。
一个狡辩:理想情况下Select
运行没有副作用,DownloadFileViaFtp
在很大程度上是一种副作用。副作用最好在Subscribe
通话中。