如何使用反应式扩展轮询状态


关于

使用反应式(使用反应式扩展的数据库轮询)进行数据库轮询已经有一个很好的问题

我有一个类似的问题,但有一个转折:我需要将上一个结果中的一个值输入到下一个请求中。 基本上,我想对此进行民意调查:

interface ResultSet<T>
{
   int? CurrentAsOfHandle {get;}
   IList<T> Results {get;}
}
Task<ResultSet<T>> GetNewResultsAsync<T>(int? previousRequestHandle);

这个想法是,这将返回自上一个请求以来的所有新项目


  1. 每一分钟我都想打电话给GetNewResultsAsync
  2. 我想将上一次调用中的CurrentAsOf作为参数传递给previousRequest参数
  3. GetNewResultsAsync的下一次调用实际上应该在上一次调用后一分钟发生

基本上,有没有比以下更好的方法:

return Observable.Create<IMessage>(async (observer, cancellationToken) =>
{
    int? currentVersion = null;
    while (!cancellationToken.IsCancellationRequested)
    {
        MessageResultSet messageResultSet = await ReadLatestMessagesAsync(currentVersion);
        currentVersion = messageResultSet.CurrentVersionHandle;
        foreach (IMessage resultMessage in messageResultSet.Messages)
            observer.OnNext(resultMessage);
        await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
    }
});

另请注意,此版本允许在等待下一次迭代时收集messageResultSet(例如,我想也许我可以使用Scan将上一个结果集对象传递到下一次迭代中)

你的问题基本上可以简化为: 有一个带有签名的Scan函数:

IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
     TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator)

但是你需要类似的东西

IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
     TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator)

。其中累加器函数返回一个可观察量,Scan 函数会自动减少它以传递到下一个调用。

这是一个穷人对Scan的功能实现:

public static IObservable<TAccumulate> MyScan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator)
{
    return source
        .Publish(_source => _source
            .Take(1)
            .Select(s => accumulator(initialValue, s))
            .SelectMany(m => _source.MyScan(m, accumulator).StartWith(m))
        );
}

鉴于此,我们可以对其进行一些更改以合并还原功能:

public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source,
    TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator)
{
    return source
        .Publish(_source => _source
            .Take(1)
            .Select(s => accumulator(initialValue, s))
            .SelectMany(async o => (await o.LastOrDefaultAsync())
                .Let(m => _source
                    .MyObservableScan(m, accumulator)
                    .StartWith(m)
                )
            )
            .Merge()
        );
}
//Wrapper to accommodate easy Task -> Observable transformations
public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source,
    TAccumulate initialValue, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator)
{
    return source.MyObservableScan(initialValue, (a, s) => Observable.FromAsync(() => accumulator(a, s)));  
}
//Function to prevent re-evaluation in functional scenarios
public static U Let<T, U>(this T t, Func<T, U> selector)
{
    return selector(t);
}

现在我们有了这个花哨的MyObservableScan运算符,我们可以相对轻松地解决您的问题:

var o = Observable.Interval(TimeSpan.FromMinutes(1))
    .MyObservableScan<long, ResultSet<string>>(null, (r, _) => Methods.GetNewResultsAsync<string>(r?.CurrentAsOfHandle))

请注意,在测试中,我注意到如果累加器Task/Observable函数花费的时间超过源中的间隔,则可观察量将终止。我不知道为什么。如果有人可以纠正,非常有义务。

从那以后我发现Observable.Generate有一个重载几乎可以解决问题。 主要缺点是它不适用于 async .

public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler);

我传入null作为我的初始状态。 传入x => true作为我的条件(无休止地轮询)。 在iterate内部,我根据传入的状态进行实际轮询。 然后在timeSelector中返回轮询间隔。

所以:

var resultSets = Observable.Generate<ResultSet<IMessage>, IEnumerable<IMessage>>(
   //initial (empty) result
   new ResultSet<IMessage>(),
   //poll endlessly (until unsubscription)
   rs => true,
   //each polling iteration
   rs => 
   {
      //get the version from the previous result (which could be that initial result)
      int? previousVersion = rs.CurrentVersionHandle;
      //here's the problem, though: it won't work with async methods :(
      MessageResultSet messageResultSet = ReadLatestMessagesAsync(currentVersion).Result;
      return messageResultSet;
   },
   //we only care about spitting out the messages in a result set
   rs => rs.Messages, 
   //polling interval
   TimeSpan.FromMinutes(1),
   //which scheduler to run each iteration 
   TaskPoolScheduler.Default);
return resultSets
  //project the list of messages into a sequence
  .SelectMany(messageList => messageList);

相关内容

  • 没有找到相关文章

最新更新