通过可等待的 TCP 连接安全地发送可观察元素



我有一个可观察量,它包装了一个数据源,它会不断监视并在发生更改时吐出它们:

IObservable<String> ReadDatasource()
{
//returns data from the data source, and watches it for changes
}

在我的代码中,我有一个TCP连接

interface Connection
{
Task Send(String data);
Boolean IsAvailable { get; }
}

订阅可观察量:

Connection _connection;
ReadDatabase()
.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(
onNext: async r =>
{
if (_connection.IsAvailable)
{
try
{
await _connection.Send(r);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}

如果在可观察量快速连续吐出大量数据时客户端关闭了连接,则仍有大量累积的任务在等待(至少我认为是这种情况(,然后由于连接不可用而抛出大量异常(_connection.IsAvailable已经被检查过(。FWIW 我无法在_connection.Send(data)方法中进行更改。在进入可观察序列中的下一个元素之前,我等待_connection.Send(data)完成没有问题。事实上,那可能更可取。

有没有一个简单的Rx风格来处理这种情况?

还有

大量的累积任务在等待着......然后由于连接不可用而引发大量异常

是的,这就是我对这段代码的期望。这并没有真正的问题,因为实际上每个发送都无法发送。如果您的代码可以正常工作,那么您可能只想保持原样。

否则。。。

(_connection。已选中可用(。

是的。Connection.IsAvailable是没有用的。Socket.Connected/TcpClient.Connected也是如此,就此而言。它们都是无用的,因为它们只告诉你是否已经发生了错误。呃,你已经知道了,因为最后一个调用已经抛出了异常。它们不提供任何保证,甚至猜测下一个方法是否会成功。这就是为什么您需要其他机制来检测套接字连接故障的原因。

我在等待_connection没有问题。发送(数据(以完成,然后再移动到可观察序列中的下一个元素。事实上,那可能更可取。

如果Connection是围绕没有写入队列的套接字的简单包装器,那么您绝对应该一次只对Send执行一次调用。这是因为在资源受限的场景中(即,始终在生产环境中,而不是在开发盒上(,套接字的"写入"操作可能仅将部分字节写入实际网络流。我假设您的Connection包装器通过继续写入直到发送整个数据缓冲区来处理部分写入。除非代码多次调用Send,否则这很好用 - 在这种情况下,您最终可能会以字节乱序(A然后写入B;A部分完成,包装器在另一次写入中发送其余A...B后(。

因此,您需要一个写入队列才能可靠运行。如果Connection已经提供了一个,那么我会说你不需要做任何其他事情;多次Send失败是正常的。但是,如果Connection只处理发送单个数据缓冲区而不对其写入请求进行排队,那么您需要自己执行此操作。

这最容易通过使用 TPL 数据流块来实现。具体来说,ActionBlock<T>

// Define the source observable.
var obs = ReadDatabase().SubscribeOn(NewThreadScheduler.Default);
// Create our queue which calls Send for each observable item.
var queue = new ActionBlock<string>(data => _connection.Send(data));
try
{
// Subscribe the queue to the observable and (asynchronously) wait for it to complete.
using (var subscription = obs.Subscribe(queue.AsObserver()))
await queue.Completion;
}
catch (Exception ex)
{
// The first exception thrown from Send will end up here.
Console.WriteLine(ex.Message);
}

数据流块理解异步代码,默认情况下,它们一次只处理一个项目。因此,此代码将一次调用一个Send,缓冲 FIFO 队列中的其他数据项,直到该Send完成。

数据流块具有"快速失败"行为,因此引发的第一个Send将出错块,导致它放弃所有剩余的排队写入。当块出错时,await queue.Completion将抛出,取消订阅可观察量并显示消息。

如果可观察量完成,则await queue.Completion将完成,再次取消订阅可观察量,并继续正常执行。

有关将 Rx 与 TPL 数据流接口的详细信息,请参阅我的 C# 并发手册,配方 7.7。您可能还会发现此堆栈溢出答案有助于理解为什么将asynclambda 传递给Subscribe并不理想。

相关内容

  • 没有找到相关文章

最新更新