使用Task.Run()时延迟



我通过ZMQ读取数据,并根据主题在单独的线程中执行任务。我注意到,当数据频率非常高(每1ms约1条消息)时,一些任务需要很长时间才能执行。

这就是我所做的:

while(true){
item = zmqSubscriber.ReceiveData(out topic, out ConsumeErrorMsg);
if (topic.Equals(topic1))
{
Task.Run(() => ExecuteTask1(item));
}
else if (topic.Equals(topic2)
{
Task.Run(() => ExecuteTask2(item));
}
else if (topic.Equals(topic3))
{
Task.Run(() => ExecuteTask3(item));
}
}

当数据频率稍低时(每100毫秒10条消息),我没有注意到任何行为问题。

我是c#的新手,我想知道这是否可能是由于活动线程池线程的最大数量很低。我在这里读到这个数字可以增加,但是,这不是一个好的实践:ThreadPool。SetMinThreads(Int32, Int32) Method

所以我想知道是否有更好的方法来实现我想做的事情??

与其让没完没了的任务转起来,还不如使用DataFlowRx来帮助分区、排队和管理你的工作负载。

它们都满足同步和异步操作,可以接受取消令牌,管理并行度,并在需要时提供反压力。你也可以把它推到其他的管道中。

var options = new ExecutionDataflowBlockOptions()
{
//BoundedCapacity = <= set this if you want back pressure
//CancellationToken = token <= set this if you like cancelling stuff
//MaxDegreeOfParallelism = <= set this if you want limited parallelism
SingleProducerConstrained = true
};
// This could all be done in the one action block,
// or different options for each block depending on your needs
var action1 = new ActionBlock<Message>(ExecuteTask1,options);
var action2 = new ActionBlock<Message>(ExecuteTask2,options);
var action3 = new ActionBlock<Message>(ExecuteTask3,options);
while (true)
{
var item = zmqSubscriber.ReceiveData(out topic, out ConsumeErrorMsg);
topic switch
{
topic1 =>  await action1.SendAsync(ConsumeErrorMsg,token),
topic2 =>  await action2.SendAsync(ConsumeErrorMsg,token),
topic3 =>  await action3.SendAsync(ConsumeErrorMsg,token),  
};
}

<子>免责声明:这不是一个关于DataFlow的教程,你需要研究这项技术,审查和适应任何像这样的解决方案,以满足你的需求。

在消息速度超过处理速度的情况下,您还应该实现一些节流策略。

最新更新