我如何才能看到我的反应式扩展查询正在做什么



我正在编写一个包含大量运算符的复杂反应扩展查询。我怎么能看到发生了什么?

我问并回答了这个问题,因为它出现了很多,可能有很好的通用性。

您可以在开发Rx操作符时将此函数大量附加到它们,以查看发生了什么:

    public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
    {
        opName = opName ?? "IObservable";
        Console.WriteLine("{0}: Observable obtained on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId);
        return Observable.Create<T>(obs =>
        {
            Console.WriteLine("{0}: Subscribed to on Thread: {1}",
                              opName,
                              Thread.CurrentThread.ManagedThreadId);
            try
            {
                var subscription = source
                    .Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}",
                                                opName,
                                                x,
                                                Thread.CurrentThread.ManagedThreadId),
                        ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}",
                                                 opName,
                                                 ex,
                                                 Thread.CurrentThread.ManagedThreadId),
                        () => Console.WriteLine("{0}: OnCompleted() on Thread: {1}",
                                                 opName,
                                                 Thread.CurrentThread.ManagedThreadId)
                    )
                    .Subscribe(obs);
                return new CompositeDisposable(
                    subscription,
                    Disposable.Create(() => Console.WriteLine(
                          "{0}: Cleaned up on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId)));
            }
            finally
            {
                Console.WriteLine("{0}: Subscription completed.", opName);
            }
        });
    }

以下是一个使用示例,显示了Range:的细微行为差异

Observable.Range(0, 1).Spy("Range").Subscribe();

给出输出:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7

但是这个:

Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe();

给出输出:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Subscription completed.
Range: Cleaned up on Thread: 7

发现差异吗?

显然,您可以将其更改为写入日志或调试,或者使用预处理器指令在Release构建等上进行精简传递订阅

您可以在整个操作符链中应用Spy。例如:

Observable.Range(0,3).Spy("Range")
          .Scan((acc, i) => acc + i).Spy("Scan").Subscribe();

给出输出:

Range: Observable obtained on Thread: 7
Scan: Observable obtained on Thread: 7
Scan: Subscribed to on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Scan: Subscription completed.
Range: OnNext(1) on Thread: 7
Scan: OnNext(1) on Thread: 7
Range: OnNext(2) on Thread: 7
Scan: OnNext(3) on Thread: 7
Range: OnCompleted() on Thread: 7
Scan: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7
Scan: Cleaned up on Thread: 7

我相信你可以找到丰富这一点的方法来满足你的目的。

又过了三年,我仍然在使用你的想法。我的版本现在发展如下:

  • 用于选择日志记录目的地的过载
  • 日志订阅数
  • 记录来自坏订阅者的"下游"异常

代码:

public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
{
    return Spy(source, opName, Console.WriteLine);
}
public static IObservable<T> Spy<T>(this IObservable<T> source, string opName, 
                                                              Action<string> logger)
{
    opName = opName ?? "IObservable";
    logger($"{opName}: Observable obtained on Thread: {Thread.CurrentThread.ManagedThreadId}");
    var count = 0;
    return Observable.Create<T>(obs =>
    {
        logger($"{opName}: Subscribed to on Thread: {Thread.CurrentThread.ManagedThreadId}");
        try
        {
            var subscription = source
                .Do(x => logger($"{opName}: OnNext({x}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                    ex => logger($"{opName}: OnError({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                    () => logger($"{opName}: OnCompleted() on Thread: {Thread.CurrentThread.ManagedThreadId}")
                )
                .Subscribe(t =>
                {
                    try
                    {
                        obs.OnNext(t);
                    }
                    catch(Exception ex)
                    {
                        logger($"{opName}: Downstream exception ({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}");
                        throw;
                    }
                }, obs.OnError, obs.OnCompleted);
            return new CompositeDisposable(
                    Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) on Thread: {Thread.CurrentThread.ManagedThreadId}")),
                    subscription,
                    Disposable.Create(() => Interlocked.Decrement(ref count)),
                    Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) completed, {count} subscriptions"))
                );
        }
        finally
        {
            Interlocked.Increment(ref count);
            logger($"{opName}: Subscription completed, {count} subscriptions.");
        }
    });
}

相关内容

  • 没有找到相关文章

最新更新