我有一个生产者,该生产商从REST API中下载数据,以及几个处理页面的消费者(例如,将它们加载到数据库(。
我想让生产商和消费者并行工作,这意味着生产者不应在下载下一个页面之前等待消耗页面。每个消费者都需要顺序处理页面。
下载所有页面时,主线程应等待所有消费者完成工作(因为消费可能比生产时间更长(。
我当前的方法如下:
我创建了一个可观察到的下载页面的可观察,该页面在附加了消费者订阅者后立即开始。我配置了订户要观察他们自己的并行执行的线程。
C#中的代码:
IEnumerable<Page> getPages = produce();
var observable = getPages.ToObservable().Publish();
observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(page => consume1(page));
observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(page => consume2(page));
observable.Connect();
此实现的问题是主线程可能在处理所有页面并停止所有页面之前完成。
如何使用RX实现此目标?
谢谢!
编辑:
也尝试了以下方法(来自答案(:
static void Main(string[] args)
{
var getPages = Enumerable.Range(0, 10);
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.Select(p => Observable.Start(() => consume1(p), els1)),
ps.Select(p => Observable.Start(() => consume2(p), els2))));
observable.Wait();
}
public static void consume1(int p)
{
Console.WriteLine($"1:{p}");
Thread.Sleep(200);
}
public static void consume2(int p)
{
Console.WriteLine($"2:{p}");
Thread.Sleep(100);
}
可观察的。输出为:
1:0
2:0
只是为了证明,如果我们将getPages替换为:
var getPages = Enumerable.Range(0, 10)
.Select(i =>
{
Console.WriteLine($"Produced {i}");
Thread.Sleep(30);
return i;
});
然后输出为:
Produced 0
Produced 1
1:0
2:0
Produced 2
Produced 3
Produced 4
2:1
Produced 5
Produced 6
Produced 7
1:1
2:2
Produced 8
Produced 9
我认为这可以做您想要的:
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));
我写了此测试代码:
var getPages = Enumerable.Range(0, 10);
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));
observable.Wait();
public void consume1(int p)
{
Console.WriteLine($"1:{p}");
Thread.Sleep(200);
}
public void consume2(int p)
{
Console.WriteLine($"2:{p}");
Thread.Sleep(100);
}
我得到了此输出:
1:02:02:11:12:22:31:22:42:51:32:62:71:42:82:91:51:61:71:81:9
完成EventLoopScheduler
实例后,应该在它们上致电.Dispose()
关闭线程。