反应式扩展 - 以同步方式刷新主题/IObservable



>编辑:我已经编辑了几行代码,在IDE中运行时,它失败而没有错误或任何东西。

我是反应式扩展的新手,并且有一个我正在尝试解决的问题。 我使用 RX 对机器上的事件进行排队,然后每隔一段时间将该数据发送到服务器。 我的问题似乎是,当应用程序关闭时,任何类型的异步调用似乎都会取消而不是运行,因此最后一批事件永远不会发送。

我有一个主题,其中事件是我的数据类。 我现在知道主题可能不是最好的类,但我们在这里。

我的代码大致如下所示,为清楚起见,添加了一些注释:

IObservable<IList<Event>> eventsObserver = Instance.EventBuffer.ToList<Event>();
var eventsEnumerable = eventsObserver.ToEnumerable();
List<Event> events = new List<Event>();
try
{
    events = (List<Event>)eventsEnumerable.First();  // THIS LINE FAILS SILENTLY, EVEN IN DEBUGGER...
}
catch(Exception ex)
{
    System.Diagnostics.Debug.WriteLine("Error: " + ex.Message);
}
using (var client = new HttpClient())
{
    client.BaseAddress = new Uri(someURI);
    HttpResponseMessage response = client.PostAsync(somePage, new StringContent(SerializeToJSON(events))).Result;
    response.EnsureSuccessStatusCode();
}

如果我不同步调用 Web 服务器(使用".结果"(,它在那里失败。 我已经尝试了很多方法将IObservable中的数据获取到我可以发送的内容中,但是代码失败(通常带有某种错误的转换(,或者事件尚未放入我想要发送的数据结构中。 我知道RX本质上是异步的,我要求以同步的方式处理它,我认为会有一个解决方案。 提前感谢!

假设你控制了可观察源,你可以像Enigmativity指出的那样调用Observable.OnComplete()。否则,您可以尝试在缓冲之前保留收到的每个值的副本:

Observable.Do(x => localCopy = x).Buffer(..)

关闭时可以访问此本地副本。

无论如何,请注意,.First()在最新的 Rx 版本中被标记为过时,可能是为了避免您遇到的问题。

最新更新