从rx.net捕获异常Observable.FromAsync



我有以下响应式扩展Subject,需要记录异常,并干净地关闭这个在Observable.FromAsync上创建的潜在阻塞异步任务。

与此相关,在取消令牌时,任何等待令牌的东西都会抛出异常TaskCanceledException

我如何捕获这些异常—忽略关机时预期的TaskCanceledException,并记录其余的?

internal sealed class TradeAggregator : IDisposable
{
private readonly Subject<TradeExecuted> feed = new();
private readonly CancellationTokenSource cts = new();
private bool isStopped;
private bool isDisposed;
public TradeAggregator(Func<TradeAggregate, CancellationToken, Task> dispatch)
{
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
}
private TradeAggregate AggregateTrades(IEnumerable<TradeExecuted> trades)
{
// Do stuff.
return new TradeAggregate();
}
public void OnNext(ExecutedTrade trade) => this.feed.OnNext(trade);
public void Stop()
{
if (isStopped) return;
isStopped = true;
cts.Cancel();
}
public void Dispose()
{
if (isDisposed) return;
isDisposed = true;

Stop();
feed.Dispose();
cts.Dispose();
}
}

使用不同的Subscribe重载:

feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
.Concat() // Ensure that the results are serialized.
.Subscribe(
x => {}, //OnNext, do nothing
e => {  //OnError, handle
if(e.GetType() == typeof(TaskCanceledException))
; //ignore
else
{
; //log
}
},
cts.Token
);

最新更新