.Net Core 3.1应用程序中的ExecuteAsync中等待之后的后台服务块



我需要一个服务,它可以连续轮询Kafka主题中的消息,将消息交给一个处理任务,并将该任务的结果推送到另一个Kafka话题。我使用.Net Core 3.1中的BackgroundService作为基本基础设施,同时我编写了一个ConsumerProducer类,该类封装了Kafka内容,该类具有以下公共方法:

public async Task ReceiveAndReply(ReplyCallback replyCallback)
{
try
{
var msg = consumer.Pull();
var reply = await replyCallback(msg);
producer.PushAsync((message)reply);
}
catch (Exception e)
{
throw new Exception(e.InnerException.Message);
};
}

BackgroundService派生类如下:

public class Worker : BackgroundService{
// initialization stuff....
protected ReplyCallback processDelegate = new ReplyCallback(ProcessQuery);
protected static Task<message> ProcessQuery(message source)
{
return new Task<message>>(() => 
{

try
{
var messagePayload= ProcessPulledMessage(source);
var replyMessage = new Message(messagePayload);
return replyMessage;
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
return null;
}
});
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await connector.ReceiveAndReply(processDelegate);
}
catch (Exception ex)
{
throw new Exception(ex.InnerException.Message);
}
}
}

}

代码被简化了,但它描述了情况。在运行时,我可以使用主题中的消息,但当在ProcessQuery中等待任务时,执行退出,应用程序继续运行,但等待的任务从未执行。也许我做了一些完全错误的事情,我不是复杂异步工作的专家,所以任何提示都是非常受欢迎的。谢谢

您正在返回一个未启动的任务。

Task构造函数的文档指出:

实例化Task对象并启动任务的最常见方法不是调用此构造函数,而是调用静态Task.Run(Action(或TaskFactory.StartNew(Action(方法。

相关内容

最新更新