如何等待完成的IOBServer呼叫,包括观察订户电话



如何等待订户方法完成时,呼叫Onnext和订户在另一个线程/任务上?我想念等待onnext()例如)对于主题...

例如,在同一线程上运行时:

    [Test, Explicit]
    public void TestSyncOnSameTask()
    {
        Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        using (var subject = new Subject<int>())
        using (subject
            .Subscribe(
                o => Console.WriteLine("Received {1} on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId,
                    o),
                () => Console.WriteLine("OnCompleted on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId)))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
        }
    }

将打印输出:

Starting on threadId:10
Received 3 on threadId:10
Received 2 on threadId:10
Received 1 on threadId:10
OnCompleted on threadId:10

在单独的线程上运行时(例如,卸载UI-Thread)

    [Test, Explicit]
    public void TestObserveOnSeparateTask()
    {
        Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        using (var subject = new Subject<int>())
        using (subject.ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(
                o => { Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                         Task.Delay(1000 * o);
                },
                () => Console.WriteLine("OnCompleted on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId)))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
        }
    }

以下结果将输出:

Starting on threadId:10
Received 3 on threadId:11

您可以看到...使用块,原因是主题呼叫onnext()立即返回,并且不等待完成订户任务的完成。

如何"等待"订户呼叫?保证订单吗?我还没有找到用于同步()方法的好文档以及此方法真正同步的方法。

我正在寻找类似的东西:

await subject.OnNext(3);
await subject.OnNext(2);

在"旧学校"中,我使用了阻止票务作为队列和

    public async Task<IResponse> Request(IRequest request)
    {
        var taskCompletionSource = new TaskCompletionSource<IResponse>();
        Task<IResponse> tsk = taskCompletionSource.Task;
        Action<IResponse> responseHandler = taskCompletionSource.SetResult; 
        this.requestProcessor.Process(request, responseHandler);
        return await tsk.ConfigureAwait(false);
    }

和请求处理器类似于:

        while (true)
        {
            //blockingCollection is shared between caller and runner
            blockingCollection.TryTake(out request, Timeout.Infinite);
            // call callback when finished to return to awaited method
        }

现在,我想改用RX/可观察到的内容,但是这个卸载主题吸引了我……这更多是关于使用反应性扩展的发布/订阅,但我认为我需要在此处进行请求/响应之类的东西。有什么方法可以使用RX?

要完成您想做的事情,您可以做到这一点:

Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var are = new AutoResetEvent(false);
using (var subject = new Subject<int>())
{
    using (
        subject
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                Task.Delay(1000 * o);
            }, () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
            are.WaitOne();
        }
}
Console.WriteLine("Done.");

为什么要阻止?我的猜测是您这样做是因为您认为您需要或在测试中进行测试,并希望阻止测试结束以验证结果。

我将首先解决测试理论。

在RX中,有调度程序的概念(IScheduler接口)。您使用这些来控制应在何处和何时进行工作。例如。您可以指定可以在TaskPool/ThreadPool,专用线程或调度程序(WPF)上安排工作。您还可以指定是否应尽快完成工作,或将来一段时间。

这些类型是控制时间/并发的首选方法,因此可以删除Task.Delay(1000 * o);

要以teh rx IDOMATION的方式复制您提供的测试,您将使用TestScheduler和支持ITestObserver<T>ITestObservable<T>类型重写(无主题,不要让我开始)。

// Define other methods and classes here
public void TestObserveOnSeparateTask()
{
    var scheduler = new TestScheduler();
    var source = scheduler.CreateColdObservable<int>(
        ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks, 3),
        ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks, 2),
        ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks, 1),
        ReactiveTest.OnCompleted<int>(TimeSpan.FromSeconds(4).Ticks)
        );
    var observer = scheduler.CreateObserver<int>();
    var subscription = source
        .ObserveOn(scheduler)
//      .Subscribe(
//          o =>
//          {
//              Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
//              //Task.Delay(1000 * o);
//          },
//          () => Console.WriteLine("OnCompleted on threadId:{0}",
//              Thread.CurrentThread.ManagedThreadId));
        .Subscribe(observer);
    scheduler.Start();
    //Pretty silly test, as we apply no query/projection to the source.
    //  Note we add 1 tick due to it being a relative/cold observable and the ObserveOn scheduling will take one time slice to perform.
    ReactiveAssert.AreElementsEqual(
        new[] {
            ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks + 1, 3),
            ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks + 1, 2),
            ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks + 1, 1),
            ReactiveTest.OnCompleted<int>(TimeSpan.FromSeconds(4).Ticks + 1)
        },
        observer.Messages);
}

好的,但是如果您想等待某种结果,或者您想知道源已经完成,该怎么办?在这些情况下,您想接受RX和可以应用的连续模式。借助SelectManyConcat等的操作员,您只能在其他序列产生一个值或完成后才获得一些序列来射击。我将这些模式称为异步大门。您可以在51:03的视频中看到马特·巴雷特(Matt Barrett)向他们解释。他展示了在完全异步(RX)系统中如何仅在发生其他前提事件时如何执行某些活动。我认为我不能完全知道您要实现的目标,但我怀疑答案是否涉及" UI线程"。

,我怀疑答案是否阻止。

关于Synchronize()方法,这纯粹是用于实现未遵循序列化合同的IObservable<T>序列。您可以应用该操作员来执行该合同。从理论上讲,如果每个人都按照规则播放,则不需要使用。

**编辑讨论CQRS/ES **

关于您的CQRS部分问题,如果您减少耦合,您也可以这样做。我想您希望发出命令有点要求/响应。即,您想知道该命令是否已被系统接受并成功处理。这是一个问题。单独的问题是异步更新读取模型。在这种情况下,我可能只使用Task/Task<T>作为接受该命令的方法的返回值。独立地,将读取模型订阅,可观察到的命令由发出命令发出的事件的可观察到的序列表示。

作为一个超级人为的示例,我将最小的DDD ES实现放在一起,我可以想出,以显示命令 grentregates 存储库/ EventStore 并观察命令中产生的事件。

void Main()
{
    var repository = new EventStore();
    repository.Events
        .OfType<FooEvent>()
        .Subscribe(evt=>UpdateReadModel(evt));
    var aggreate = new MyAggregate(Guid.NewGuid());
    var command = new FooCommand(1);
    aggreate.Handle(command);
    repository.Save(aggreate);
}
public void UpdateReadModel(FooEvent fooEvent)
{
    Console.WriteLine(fooEvent.SomeValue);
}
// Define other methods and classes here
public class FooCommand
{
    public FooCommand(int someValue)
    {
        SomeValue = someValue;
    }
    public int SomeValue { get; private set; }
}
public class FooEvent
{
    public FooEvent(int someValue)
    {
        SomeValue = someValue;
    }
    public int SomeValue { get; private set; }
}
public interface IAggregate
{
    List<object> GetUncommittedEvents();
    void Commit();
}
public class MyAggregate : IAggregate
{
    private readonly Guid _id;
    private readonly List<object> _uncommittedEvents = new List<object>();
    public MyAggregate(Guid id)
    {
        _id = id;
    }
    public List<Object> GetUncommittedEvents()
    {
        return _uncommittedEvents;
    }
    public void Commit()
    {
        _uncommittedEvents.Clear();
    }
    public void Handle(FooCommand command)
    {
        //As a result of processing the FooCommand we emit the FooEvent
        var fooEvent = new FooEvent(command.SomeValue);
        _uncommittedEvents.Add(fooEvent);
    }
}
public class EventStore
{
    private readonly ISubject<object> _events = new ReplaySubject<object>();
    public IObservable<object> Events { get { return _events.AsObservable(); } }
    public void Save(IAggregate aggregate)
    {
        foreach (var evt in aggregate.GetUncommittedEvents())
        {
            _events.OnNext(evt);
        }
        aggregate.Commit();
    }
}

现在在现实世界中,您将使用诸如geteventstore.com,neventstore或cedar.eventstore之类的东西,但是结构保持不变。您将命令按汇总推出,产生事件,将这些事件保存到Eventsource,事件源然后将这些事件(从频段/异步)排放到读取模式中。

最新更新