我通过ZMQ读取数据,并根据主题在单独的线程中执行任务。我注意到,当数据频率非常高(每1ms约1条消息)时,一些任务需要很长时间才能执行。
这就是我所做的:
while(true){
item = zmqSubscriber.ReceiveData(out topic, out ConsumeErrorMsg);
if (topic.Equals(topic1))
{
Task.Run(() => ExecuteTask1(item));
}
else if (topic.Equals(topic2)
{
Task.Run(() => ExecuteTask2(item));
}
else if (topic.Equals(topic3))
{
Task.Run(() => ExecuteTask3(item));
}
}
当数据频率稍低时(每100毫秒10条消息),我没有注意到任何行为问题。
我是c#的新手,我想知道这是否可能是由于活动线程池线程的最大数量很低。我在这里读到这个数字可以增加,但是,这不是一个好的实践:ThreadPool。SetMinThreads(Int32, Int32) Method
所以我想知道是否有更好的方法来实现我想做的事情??
与其让没完没了的任务转起来,还不如使用DataFlow或Rx来帮助分区、排队和管理你的工作负载。
它们都满足同步和异步操作,可以接受取消令牌,管理并行度,并在需要时提供反压力。你也可以把它推到其他的管道中。
var options = new ExecutionDataflowBlockOptions()
{
//BoundedCapacity = <= set this if you want back pressure
//CancellationToken = token <= set this if you like cancelling stuff
//MaxDegreeOfParallelism = <= set this if you want limited parallelism
SingleProducerConstrained = true
};
// This could all be done in the one action block,
// or different options for each block depending on your needs
var action1 = new ActionBlock<Message>(ExecuteTask1,options);
var action2 = new ActionBlock<Message>(ExecuteTask2,options);
var action3 = new ActionBlock<Message>(ExecuteTask3,options);
while (true)
{
var item = zmqSubscriber.ReceiveData(out topic, out ConsumeErrorMsg);
topic switch
{
topic1 => await action1.SendAsync(ConsumeErrorMsg,token),
topic2 => await action2.SendAsync(ConsumeErrorMsg,token),
topic3 => await action3.SendAsync(ConsumeErrorMsg,token),
};
}
<子>免责声明:这不是一个关于DataFlow的教程,你需要研究这项技术,审查和适应任何像这样的解决方案,以满足你的需求。子>
在消息速度超过处理速度的情况下,您还应该实现一些节流策略。