我有一个参数化的rest调用,应该每五秒钟用不同的参数执行一次:
Observable<TResult> restCall = api.method1(param1);
我需要创建一个Observable<TResult>
,它将每隔5秒用不同的param1值轮询restCall。如果api调用失败,我需要得到一个错误,并在5秒内进行下一次调用。只有当restCall完成(成功/错误)时,才能测量调用之间的间隔。
我目前正在使用RxJava,但一个.NET示例也很好。
简介
首先,我承认,我是一个.NET爱好者,我知道这种方法使用了一些在Java中没有直接等价物的习惯用法。但我相信你的话,并在这样的基础上继续,这是一个伟大的问题,.NET的家伙会喜欢,希望它能引导你在rx-java中走上正确的道路,这是我从未看过的。这是一条很长的答案,但它主要是解释-解决方案代码本身很短!
使用任一
我们首先需要整理一些工具来帮助解决这个问题。第一种是使用Either<TLeft, TRight>
类型。这一点很重要,因为每次调用都有两个可能的结果要么是好结果,要么是错误。但我们需要将它们封装在一个类型中——我们不能使用OnError将错误发回,因为这会终止结果流。两者中的任何一个看起来都有点像元组,使处理这种情况变得更容易。Rxx库有一个非常完整和良好的Either
实现,但这里有一个简单的通用用法示例,然后是一个对我们来说很好的简单实现:
var goodResult = Either.Right<Exception,int>(1);
var exception = Either.Left<Exception,int>(new Exception());
/* base class for LeftValue and RightValue types */
public abstract class Either<TLeft, TRight>
{
public abstract bool IsLeft { get; }
public bool IsRight { get { return !IsLeft; } }
public abstract TLeft Left { get; }
public abstract TRight Right { get; }
}
public static class Either
{
public sealed class LeftValue<TLeft, TRight> : Either<TLeft, TRight>
{
TLeft _leftValue;
public LeftValue(TLeft leftValue)
{
_leftValue = leftValue;
}
public override TLeft Left { get { return _leftValue; } }
public override TRight Right { get { return default(TRight); } }
public override bool IsLeft { get { return true; } }
}
public sealed class RightValue<TLeft, TRight> : Either<TLeft, TRight>
{
TRight _rightValue;
public RightValue(TRight rightValue)
{
_rightValue = rightValue;
}
public override TLeft Left { get { return default(TLeft); } }
public override TRight Right { get { return _rightValue; } }
public override bool IsLeft { get { return false; } }
}
// Factory functions to create left or right-valued Either instances
public static Either<TLeft, TRight> Left<TLeft, TRight>(TLeft leftValue)
{
return new LeftValue<TLeft, TRight>(leftValue);
}
public static Either<TLeft, TRight> Right<TLeft, TRight>(TRight rightValue)
{
return new RightValue<TLeft, TRight>(rightValue);
}
}
注意,按照惯例,当使用"要么"来建模成功或失败时,右侧用于成功值,因为它当然是"右侧":)
一些帮助程序函数
我将用一些辅助函数模拟问题的两个方面。首先,这里有一个生成参数的工厂——每次调用它时,它都会返回以1:开头的整数序列中的下一个整数
// An infinite supply of parameters
private static int count = 0;
public int ParameterFactory()
{
return ++count;
}
接下来,这里有一个函数,它将Rest调用模拟为IObservable。此函数接受整数和:
- 如果整数为偶数,则返回一个Observable,该Observable会立即发送一个OnError
- 如果整数是奇数,它会返回一个字符串,将整数与"-ret"连接起来,但仅在一秒钟后返回。我们将使用它来检查轮询间隔是否按照您的请求运行——无论调用需要多长时间,都是完成调用之间的暂停,而不是常规间隔
这里是:
// A asynchronous function representing the REST call
public IObservable<string> SomeRestCall(int x)
{
return x % 2 == 0
? Observable.Throw<string>(new Exception())
: Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));
}
现在好的一点
下面是一个相当通用的可重用函数,我称之为Poll
。它接受一个将被轮询的异步函数、该函数的参数工厂、所需的rest(并非双关语!)间隔,以及最后要使用的IScheduler。
我能想到的最简单的方法是使用Observable.Create
,它使用调度器来驱动结果流。ScheduleAsync
是一种使用.NET async/await形式的调度方式。这是一种.NET习惯用法,允许您以命令式方式编写异步代码。async
关键字引入了一个异步函数,该函数可以在其主体中await
一个或多个异步调用,并且只有在调用完成时才会继续。在这个问题中,我写了一篇关于这种调度风格的长篇解释,其中包括旧的递归风格,这种风格可能更容易在rx-java方法中实现。代码如下:
public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
Func<TArg, IObservable<TResult>> asyncFunction,
Func<TArg> parameterFactory,
TimeSpan interval,
IScheduler scheduler)
{
return Observable.Create<Either<Exception, TResult>>(observer =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) => {
while(!ct.IsCancellationRequested)
{
try
{
var result = await asyncFunction(parameterFactory());
observer.OnNext(Either.Right<Exception,TResult>(result));
}
catch(Exception ex)
{
observer.OnNext(Either.Left<Exception, TResult>(ex));
}
await ctrl.Sleep(interval, ct);
}
});
});
}
细分来看,Observable.Create
通常是一个创建IObservable的工厂,它可以让您在很大程度上控制如何将结果发布给观察者。它经常被忽视,而倾向于不必要的复杂的基元组成。
在这种情况下,我们使用它来创建Either<TResult, Exception>
流,以便返回成功和失败的轮询结果。
Create
函数接受一个观察器,该观察器表示我们通过OnNext/OnError/OnCompleted将结果传递给的订阅服务器。我们需要在Create
调用中返回一个IDisposable
——在.NET中,这是订阅服务器可以取消订阅的句柄。这一点特别重要,因为否则投票将永远进行下去——或者至少永远不会OnComplete
。
ScheduleAsync
(或普通Schedule
)的结果就是这样一个句柄。当被处理时,它将取消我们计划的任何未决事件,从而结束轮询循环。在我们的情况下,我们用于管理间隔的Sleep
是可取消操作,尽管Poll函数可以很容易地修改为接受也接受CancellationToken
的可取消asyncFunction
。
ScheduleAsync方法接受一个将被调用以安排事件的函数。它被传递了两个参数,第一个ctrl
是调度器本身。第二个ct
是一个CancellationToken,我们可以使用它来查看是否已请求取消(通过订阅者处理其订阅句柄)。
轮询本身是通过无限while循环执行的,只有当CancellationToken指示已请求取消时,该循环才会终止。
在循环中,我们可以使用async/await的魔力异步调用轮询函数,但仍将其封装在异常处理程序中。这太棒了!假设没有错误,我们通过OnNext
将结果作为Either
的右值发送给观测者。如果出现异常,我们将其作为Either
的左值发送给观测者。最后,我们在调度器上使用Sleep
函数来调度休息间隔后的唤醒调用——不要与Thread.Sleep
调用混淆,这个调用通常不会阻塞任何线程。请注意,Sleep接受CancellationToken
,使其也可以中止!
我想你会同意这是一个非常酷的使用async/await来简化原本非常棘手的问题!
示例用法
最后,这里有一些调用Poll
的测试代码,以及示例输出-对于LINQPad粉丝来说,这个答案中的所有代码都将在LINQPad中运行,并引用Rx 2.1程序集:
void Main()
{
var subscription = Poll(SomeRestCall,
ParameterFactory,
TimeSpan.FromSeconds(5),
ThreadPoolScheduler.Instance)
.TimeInterval()
.Subscribe(x => {
Console.Write("Interval: " + x.Interval);
var result = x.Value;
if(result.IsRight)
Console.WriteLine(" Success: " + result.Right);
else
Console.WriteLine(" Error: " + result.Left.Message);
});
Console.ReadLine();
subscription.Dispose();
}
Interval: 00:00:01.0027668 Success: 1-ret
Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0009684 Success: 3-ret
Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0113053 Success: 5-ret
Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.
请注意,如果立即返回错误,则结果之间的间隔为5秒(轮询间隔),如果成功,则为6秒(轮询周期加上模拟REST调用持续时间)。
EDIT-这里有一个替代实现,不使用ScheduleAsync,而是使用旧式递归调度,并且没有async/await语法。正如您所看到的,它要混乱得多,但它也支持取消可观察到的asyncFunction。
public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
Func<TArg, IObservable<TResult>> asyncFunction,
Func<TArg> parameterFactory,
TimeSpan interval,
IScheduler scheduler)
{
return Observable.Create<Either<Exception, TResult>>(
observer =>
{
var disposable = new CompositeDisposable();
var funcDisposable = new SerialDisposable();
bool cancelRequested = false;
disposable.Add(Disposable.Create(() => { cancelRequested = true; }));
disposable.Add(funcDisposable);
disposable.Add(scheduler.Schedule(interval, self =>
{
funcDisposable.Disposable = asyncFunction(parameterFactory())
.Finally(() =>
{
if (!cancelRequested) self(interval);
})
.Subscribe(
res => observer.OnNext(Either.Right<Exception, TResult>(res)),
ex => observer.OnNext(Either.Left<Exception, TResult>(ex)));
}));
return disposable;
});
}
请参阅我的另一个答案,了解一种不同的方法,该方法避免了.NET 4.5异步/等待功能,并且不使用Schedule调用。
我真的希望这能对rx-java的人有所帮助!
我已经清理了不直接使用Schedule调用的方法-使用我的另一个答案中的任一类型-它也可以使用相同的测试代码并给出相同的结果:
public IObservable<Either<Exception, TResult>> Poll2<TResult, TArg>(
Func<TArg, IObservable<TResult>> asyncFunction,
Func<TArg> parameterFactory,
TimeSpan interval,
IScheduler scheduler)
{
return Observable.Create<Either<Exception, TResult>>(
observer =>
Observable.Defer(() => asyncFunction(parameterFactory()))
.Select(Either.Right<Exception, TResult>)
.Catch<Either<Exception, TResult>, Exception>(
ex => Observable.Return(Either.Left<Exception, TResult>(ex)))
.Concat(Observable.Defer(
() => Observable.Empty<Either<Exception, TResult>>()
.Delay(interval, scheduler)))
.Repeat().Subscribe(observer));
}
这具有适当的取消语义。
实施说明
- 整个构造使用Repeat来获得循环行为
- 初始延时用于确保在每次迭代中传递不同的参数
- Select将OnNext结果投影到右侧的"任一">
- Catch将OnError结果投影到左侧的"非此即彼"-请注意,此异常终止了当前可观察到的asyncFunction,因此需要重复
- Concat添加间隔延迟
我的观点是调度版本更可读,但这一版本不使用async/await,因此与.NET 4.0兼容。