.net core AsyncLocal与System.Reactive丢失上下文



我想使用AsyncLocal通过异步工作流传递信息以实现跟踪目的。现在我遇到了RX的问题
Thios是我的测试代码:

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
public class RxTest
{
private readonly Subject<int> test = new Subject<int>();
private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();
public void Test()
{
this.test
// .ObserveOn(Scheduler.Default)
.Subscribe(this.OnNextNormal);
this.test
// .ObserveOn(Scheduler.Default)
.Delay(TimeSpan.FromMilliseconds(1))
.Subscribe(this.OnNextDelayed);
for (var i = 0; i < 2; i++)
{
var index = i;
Task.Run(() =>
{
this.asyncContext.Value = index;
Console.WriteLine(
$"Maintt{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
this.test.OnNext(index);
});
}
Console.ReadKey();
}
private void OnNextNormal(int obj)
{
Console.WriteLine(
$"OnNextNormalt{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
private void OnNextDelayed(int obj)
{
Console.WriteLine(
$"OnNextDelayedt{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
}

输出为:

Main 0(线程:5(:AsyncLocal.Value=>0
Main 1(线程:6

如您所见,AsyncLocal.Value不会流到延迟订阅的方法
=>异步值在延迟轨道上丢失

据我所知,普通的Subscribe((不使用调度器,Delay((使用调度器
当我对两个调用都使用ObserveOn((时,两个调用的输出如下

Main 0(线程:5(:AsyncLocal.Value=>0
Main 1(线程:7

=>异步值在每个磁道上都会丢失

有没有办法让ExecutionContext与RX一起流动
我只发现了这个,但问题就在另一边。他们解决了观察者的语境如何流动的问题。我想了解出版商的情况。

我想要实现的是:

  1. 来自"外部"的消息进入我的服务
  2. 在服务内分发消息(RX(
  3. 记录消息时,使用MessageId格式化日志消息
  4. 我不想到处传递信息

提前感谢您的回答。

Rx中自由流动的执行上下文使其适用于大多数多线程场景。您可以通过绕过以下调度方法来强制执行线程上下文:

public static class Extensions
{
public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
{
return Observable.Create<T>(
observer => observable.Subscribe(
onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
onError: observer.OnError,
onCompleted: observer.OnCompleted
)
);
}
}

输出:

OnNextDelayed   2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed   3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed   1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed   0 (Thread: 5): AsyncLocal.Value => 0

这确实会延续上下文,但对于较大的查询来说,它很快就会变得复杂。我不确定实现一个在通知时保留上下文的IScheduler是否能很好地工作。如果消息复制不需要太多开销,那么这可能最适合Rx。

相关内容

  • 没有找到相关文章

最新更新