如何等待订户方法完成时,呼叫Onnext和订户在另一个线程/任务上?我想念等待onnext()例如)对于主题...
例如,在同一线程上运行时:
[Test, Explicit]
public void TestSyncOnSameTask()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
using (var subject = new Subject<int>())
using (subject
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId)))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
}
}
将打印输出:
Starting on threadId:10
Received 3 on threadId:10
Received 2 on threadId:10
Received 1 on threadId:10
OnCompleted on threadId:10
在单独的线程上运行时(例如,卸载UI-Thread)
[Test, Explicit]
public void TestObserveOnSeparateTask()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
using (var subject = new Subject<int>())
using (subject.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(
o => { Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Task.Delay(1000 * o);
},
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId)))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
}
}
以下结果将输出:
Starting on threadId:10
Received 3 on threadId:11
您可以看到...使用块,原因是主题呼叫onnext()立即返回,并且不等待完成订户任务的完成。
如何"等待"订户呼叫?保证订单吗?我还没有找到用于同步()方法的好文档以及此方法真正同步的方法。
我正在寻找类似的东西:
await subject.OnNext(3);
await subject.OnNext(2);
在"旧学校"中,我使用了阻止票务作为队列和
public async Task<IResponse> Request(IRequest request)
{
var taskCompletionSource = new TaskCompletionSource<IResponse>();
Task<IResponse> tsk = taskCompletionSource.Task;
Action<IResponse> responseHandler = taskCompletionSource.SetResult;
this.requestProcessor.Process(request, responseHandler);
return await tsk.ConfigureAwait(false);
}
和请求处理器类似于:
while (true)
{
//blockingCollection is shared between caller and runner
blockingCollection.TryTake(out request, Timeout.Infinite);
// call callback when finished to return to awaited method
}
现在,我想改用RX/可观察到的内容,但是这个卸载主题吸引了我……这更多是关于使用反应性扩展的发布/订阅,但我认为我需要在此处进行请求/响应之类的东西。有什么方法可以使用RX?
要完成您想做的事情,您可以做到这一点:
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var are = new AutoResetEvent(false);
using (var subject = new Subject<int>())
{
using (
subject
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Task.Delay(1000 * o);
}, () =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
are.WaitOne();
}
}
Console.WriteLine("Done.");
为什么要阻止?我的猜测是您这样做是因为您认为您需要或在测试中进行测试,并希望阻止测试结束以验证结果。
我将首先解决测试理论。
在RX中,有调度程序的概念(IScheduler
接口)。您使用这些来控制应在何处和何时进行工作。例如。您可以指定可以在TaskPool/ThreadPool,专用线程或调度程序(WPF)上安排工作。您还可以指定是否应尽快完成工作,或将来一段时间。
这些类型是控制时间/并发的首选方法,因此可以删除Task.Delay(1000 * o);
。
要以teh rx IDOMATION的方式复制您提供的测试,您将使用TestScheduler
和支持ITestObserver<T>
和ITestObservable<T>
类型重写(无主题,不要让我开始)。
// Define other methods and classes here
public void TestObserveOnSeparateTask()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks, 3),
ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks, 2),
ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks, 1),
ReactiveTest.OnCompleted<int>(TimeSpan.FromSeconds(4).Ticks)
);
var observer = scheduler.CreateObserver<int>();
var subscription = source
.ObserveOn(scheduler)
// .Subscribe(
// o =>
// {
// Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
// //Task.Delay(1000 * o);
// },
// () => Console.WriteLine("OnCompleted on threadId:{0}",
// Thread.CurrentThread.ManagedThreadId));
.Subscribe(observer);
scheduler.Start();
//Pretty silly test, as we apply no query/projection to the source.
// Note we add 1 tick due to it being a relative/cold observable and the ObserveOn scheduling will take one time slice to perform.
ReactiveAssert.AreElementsEqual(
new[] {
ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks + 1, 3),
ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks + 1, 2),
ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks + 1, 1),
ReactiveTest.OnCompleted<int>(TimeSpan.FromSeconds(4).Ticks + 1)
},
observer.Messages);
}
好的,但是如果您想等待某种结果,或者您想知道源已经完成,该怎么办?在这些情况下,您想接受RX和可以应用的连续模式。借助SelectMany
,Concat
等的操作员,您只能在其他序列产生一个值或完成后才获得一些序列来射击。我将这些模式称为异步大门。您可以在51:03的视频中看到马特·巴雷特(Matt Barrett)向他们解释。他展示了在完全异步(RX)系统中如何仅在发生其他前提事件时如何执行某些活动。我认为我不能完全知道您要实现的目标,但我怀疑答案是否涉及" UI线程"。
关于Synchronize()
方法,这纯粹是用于实现未遵循序列化合同的IObservable<T>
序列。您可以应用该操作员来执行该合同。从理论上讲,如果每个人都按照规则播放,则不需要使用。
**编辑讨论CQRS/ES **
关于您的CQRS部分问题,如果您减少耦合,您也可以这样做。我想您希望发出命令有点要求/响应。即,您想知道该命令是否已被系统接受并成功处理。这是一个问题。单独的问题是异步更新读取模型。在这种情况下,我可能只使用Task
/Task<T>
作为接受该命令的方法的返回值。独立地,将读取模型订阅,可观察到的命令由发出命令发出的事件的可观察到的序列表示。
作为一个超级人为的示例,我将最小的DDD ES实现放在一起,我可以想出,以显示命令, grentregates ,存储库/ EventStore 并观察命令中产生的事件。
void Main()
{
var repository = new EventStore();
repository.Events
.OfType<FooEvent>()
.Subscribe(evt=>UpdateReadModel(evt));
var aggreate = new MyAggregate(Guid.NewGuid());
var command = new FooCommand(1);
aggreate.Handle(command);
repository.Save(aggreate);
}
public void UpdateReadModel(FooEvent fooEvent)
{
Console.WriteLine(fooEvent.SomeValue);
}
// Define other methods and classes here
public class FooCommand
{
public FooCommand(int someValue)
{
SomeValue = someValue;
}
public int SomeValue { get; private set; }
}
public class FooEvent
{
public FooEvent(int someValue)
{
SomeValue = someValue;
}
public int SomeValue { get; private set; }
}
public interface IAggregate
{
List<object> GetUncommittedEvents();
void Commit();
}
public class MyAggregate : IAggregate
{
private readonly Guid _id;
private readonly List<object> _uncommittedEvents = new List<object>();
public MyAggregate(Guid id)
{
_id = id;
}
public List<Object> GetUncommittedEvents()
{
return _uncommittedEvents;
}
public void Commit()
{
_uncommittedEvents.Clear();
}
public void Handle(FooCommand command)
{
//As a result of processing the FooCommand we emit the FooEvent
var fooEvent = new FooEvent(command.SomeValue);
_uncommittedEvents.Add(fooEvent);
}
}
public class EventStore
{
private readonly ISubject<object> _events = new ReplaySubject<object>();
public IObservable<object> Events { get { return _events.AsObservable(); } }
public void Save(IAggregate aggregate)
{
foreach (var evt in aggregate.GetUncommittedEvents())
{
_events.OnNext(evt);
}
aggregate.Commit();
}
}
现在在现实世界中,您将使用诸如geteventstore.com,neventstore或cedar.eventstore之类的东西,但是结构保持不变。您将命令按汇总推出,产生事件,将这些事件保存到Eventsource,事件源然后将这些事件(从频段/异步)排放到读取模式中。