我是Rx的新手,我正在尝试使用它来拥有多达X个并发订阅任务。数据源实际上来自数据库,因此我必须轮询数据库。我意识到 Rx 背后的想法是它是推而不是拉- 所以民意调查不太适合,但从概念上讲,进入数据库的数据是我想订阅并做某事的事件流。
我遇到的主要问题是LimitedConcurrencyLevelTaskScheduler
似乎没有成功地限制指定数量的任务。它比我指定的 8 个并发运行得更多。
我也不确定以下两个解决方案中哪个是更好的方法(或者可能两者都错了?!
这是我尝试过的一种方法,它使用Observable.Timer
...
public static void Main()
{
var taskFactory = new TaskFactory (new LimitedConcurrencyLevelTaskScheduler (8));
var scheduler = new TaskPoolScheduler (taskFactory);
Observable.Timer (TimeSpan.FromMilliseconds (10), scheduler)
.SelectMany (x => Observable.FromAsync (GetItemsSource))
.Repeat ()
.ObserveOn (scheduler)
.Subscribe (x => Observable.FromAsync(y => DoSomethingAsync (x.ToList())));
Console.ReadKey ();
}
private static async Task<IEnumerable<Guid>> GetItemsSource()
{
return await _myRepo.GetMoreAsync(10);
}
private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
// Do something with the data
}
我也尝试过这样做...
public static void Main()
{
GetItemsSource()
.ObserveOn(scheduler)
.Select (async x => await DoSomethingAsync(x))
.Subscribe();
Console.ReadKey ();
}
public static IObservable<Guid> GetItemsSource()
{
return Observable.Create<Guid>(
async obs =>
{
while (true)
{
var item = (await _myRepo.GetMoreAsync(1)).FirstOrDefault();
if(item != null)
{
obs.OnNext(item);
}
await Task.Delay(TimeSpan.FromMilliseconds(10))
}
});
}
private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
// Do something with the data
}
显然非常简单的示例,没有错误处理或取消支持。
两者都似乎有效,但都不限于 8 个并发任务。
正如我所说,我对 Rx 很陌生,可能缺少很多基本的东西。我当然计划做大量的阅读以完全理解 Rx,因为它看起来非常强大,但现在我想让一些东西快速工作。
更新
继Enigmativity的答案和评论之后,这里有一些代码可以记录并发计数...
void Main()
{
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(8));
var scheduler = new TaskPoolScheduler(taskFactory);
using (
(
from n in Observable.Interval(TimeSpan.FromMilliseconds(10), scheduler)
from g in Observable.FromAsync(GetItemsSource, scheduler)
from u in Observable.FromAsync(() => DoSomethingAsync(g), scheduler)
select u)
.ObserveOn(scheduler)
.Subscribe())
{
Console.ReadLine();
}
}
private static volatile int _numIn = 0;
private static volatile int _numOut = 0;
public static async Task<IEnumerable<Guid>> GetItemsSource()
{
try
{
_numIn++;
$"Concurrent tasks (in): {_numIn}".Dump();
// Simulate async API call
await Task.Delay(TimeSpan.FromMilliseconds(10));
return new List<Guid> { Guid.NewGuid() };
}
finally
{
_numIn--;
}
}
private static async Task DoSomethingAsync(IEnumerable<Guid> deliveryIds)
{
try
{
_numOut++;
// Simulate async calls required to process the event
await Task.Delay(TimeSpan.FromMilliseconds(1000));
$"Concurrent tasks (out): {_numOut}".Dump();
}
finally
{
_numOut--;
}
}
这显示了大约 64 个并发任务正在运行。
更新 2
看起来确实是因为订阅者是异步的。如果我使用非异步订阅者进行测试,它工作正常。不幸的是,我需要一个异步订阅者,因为它需要调用其他异步方法。
看起来我可以通过这样做做类似的事情......
GetItemsSource2()
.Select(x => Observable.FromAsync(() => DoSomethingAsync(x)))
.Merge(64)
.Subscribe();
所以使用Merge
而不是LimitedConcurrencyLevelTaskScheduler
.
我通过这样做让你的代码工作:
void Main()
{
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(8));
var scheduler = new TaskPoolScheduler(taskFactory);
using (Observable.Timer(TimeSpan.FromMilliseconds(10), scheduler)
.SelectMany(x => Observable.FromAsync(GetItemsSource))
.Repeat()
.ObserveOn(scheduler)
.Subscribe(async x => await DoSomethingAsync(x.ToList())))
{
Console.ReadLine();
};
}
private static async Task<IEnumerable<Guid>> GetItemsSource()
{
return await Task.Run(() => Enumerable.Range(0, 10).Select(x => Guid.NewGuid()).ToArray());
}
private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
await Task.Run(() => Console.WriteLine(String.Join("|", items.Select(x => x.ToString()))));
}
我还修改了LimitedConcurrencyLevelTaskScheduler
的QueueTask
方法,以包含正在运行的委托数的跟踪线:
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
Console.WriteLine(_delegatesQueuedOrRunning);
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
当我运行您的代码时,我得到了以下输出:
0 1 1 1 874695CA-E9A8-4688-A4D2-D7D2446E7924|e3cbadf1-c5cb-4339-ab59-33d873db98f2|8dd710f5-0b21-4b20-a547-6ccfe7c29af4|42739c5a-4602-4f76-8aed-40f1c0ffccdc|c599e879-06ca-459f-b27e-95ac15dc4cc1|562b5ff-c5fa 7C-dcb2-47d8-aa4a-503499654139|b78e5fc5-d152-4380-9799-2713c6f71c19|e47669a3-a399-4891-91b6-3a28b52a941a|f6483f2f-f8d7-47e8-9f88-bf9bc5f61f3a|c8e75203-bc55-4e00-9f8b-ecf248f81454 2 2 1 2 3 92d6f76e-6a3d-475d-8c1c-bd3e59aaf2a1|1b2bd0d6-c439-4b3c-a1f1-9af1f5132afc|c140e0ec-6741-4310-9edf-547fdf390b01|5b29dcd3-21c9-46a2-ae98-cd7a7b1a003c|5a808def-a09f-41a7-acfc-d11cfbbb4faa|28f4f4 7d0c-7762-4949-ae33-427c82756874|13087b1c-c4eb-4f0f-bf5f-665a2298ac13|50c00907-668d-44f3-9e2a-c790348fb715|34f8602a-18ef-45b6-b069-0fb30718a45b|46d6616d-0e89-49cd-8905-dff72bb63add 2 1 1 2 7c6cbe5a-43d1-4aad-8eee-33a0d7f44276|2164e3e0-4e8d-4a57-99b7-0aa5e42281da|e2cf26ed-501f-4032-9761-53a301e16bb9|a3e9171a-f490-4135-a930-017dc706293e|b9f43f5c-d652-4205-b857-724699f17 8C5|5D8E6149-F9AD-424D-9ee2-07AFA344ab80|418d2526-ADF6-4C26-AC84-636400FCE547|3b8c3b14-9e91-4bb8-9cfa-c1d36f12ccc0|be3c8c84-5112-4c85-ba7f-d1b41a2ec03a|bdc34ccb-a3d9-45fa-bf54-d42bcd791081 2 1 1 2 3d5e0d0a-ddb3-4595-b960-4d2050d4fa1a|1b5a39c9-652c-4872-8d1c-6e1212cd4043|fd9aff7b-9c77-47e4-a4c6-2ede38472b92|ff6145d3-2dc2-45b6-bc40-2cc1f270572d|5ba0e441-a9f1-4b2a-baca-127cce622993|e2650bb9-f2e4-4a89-8c2c-69859f5f381a|a86a1f4e-7ea7-465c-9730-7ac735c5616e|5cf7135b-fafe-4725-938e-5e7ea55f4c3e|dd26bda7-d86b-4bb9-977f-a29ef5cf6d76|6e77c1c1-4e8d-4b48-b6f4-d54f0ec9d269 1 1
...等。。。
您的代码永远不会命中超过 3 个并发任务。
现在我建议你尝试用一种更像Rx的方式编写代码。试试这个:
using (
(
from n in Observable.Interval(TimeSpan.FromSeconds(1.0), scheduler)
from g in Observable.FromAsync(() => GetItemsSource(), scheduler)
from u in Observable.FromAsync(() => DoSomethingAsync(g), scheduler)
select u)
.ObserveOn(scheduler)
.Subscribe())
{
Console.ReadLine();
};
它不会改变使用的任务数量,但它更干净。