等待foreach()启动后重叠任务



这是我的场景。在IHostedService中,我需要订阅GRPC通道,并使用await foreach()处理消息。

var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new Messenger.MessengerClient(channel);
var messages= client.SubscribeToMessages();
await foreach (var message in messages.ResponseStream.ReadAllAsync())
{
// do something with message
}

由于我订阅了流,所以我可能错过了服务器在我的客户端订阅之前发送的消息。MessengerClient有一种方法,我可以在其中获得当天所有消息的快照:

var snapshotMessages = client.GetTodaysMessages();

我想打电话给客户。GetTodaysMessages()在订阅流之后,这样会有重叠,我保证会得到所有东西。

我正在考虑在等待foreach(…)之前启动一个计时器,然后给客户打电话。计时器回调中的GetTodaysMessages()。你觉得这种方法怎么样?

一个要求是,如果我与GRPC流断开连接,我需要重新订阅并呼叫客户端。再次获取今日消息()。

我愿意接受你可能有的建议

Microsoft有一个名为异步编程的文档,它可以在这里为您提供帮助。

本质上,您可以将调用分配给Task,而不是调用await,CCD_2将在您能够执行其他操作时运行。准备好后,您可以等待Task完成。Task也有一个Result属性,它包含您等待的代码的结果。

有一种非常有用的方法可以等待多个任务,方法是对正在等待的任务数组调用Task.WhenAny()

var breakfastTasks = new List<Task> { eggsTask, baconTask, toastTask };
while (breakfastTasks.Count > 0)
{
Task finishedTask = await Task.WhenAny(breakfastTasks);
if (finishedTask == eggsTask)
{
Console.WriteLine("Eggs are ready");
}
else if (finishedTask == baconTask)
{
Console.WriteLine("Bacon is ready");
}
else if (finishedTask == toastTask)
{
Console.WriteLine("Toast is ready");
}
breakfastTasks.Remove(finishedTask);
}

你可以试着让你的前臂像Task一样(这是即兴写的,我没有试图编译它,但它可能会对你有所帮助):

var task = messages.ResponseStream.ReadAllAsync();
// do other work here, while the task runs
var snapshotMessages = client.GetTodaysMessages();
// and then deal with the result of the message stream
await task;
foreach(var message in task.Result){ 
// process message
}

XY问题。

解决你的问题的办法是不要有你的问题。

"由于我订阅了一个流,所以我有可能错过了消息;。

这是gRPC的一个特点

选择一种技术;由于我使用的是X,所以我没有错过消息的机会;。您希望使用消息队列而不是Observable模式。看看

  • MSMQ
  • Rabbit MQ
  • MQTT
  • MassTransit
  • Redis
  • Tibco
  • SimpleMQ
  • 等等

消息代理将用零代码解决您的问题。

我想你应该这样做:

var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new Messenger.MessengerClient(channel);
var messages = client.SubscribeToMessages();
// Get the stream going so that you don't miss any
var messagesEnumerableTask = messages.ResponseStream.ReadAllAsync();
// Get today's messages
var snapshotMessages = await client.GetTodaysMessages(); // I'm asuming this is actually awaitable?
// Process today's messages
foreach (var message in snapshotMessages)
{
if (!IsProcessed(message))
{
Process(message);
} 
}
// Process the stream
await foreach (var message in messagesEnumerableTask)
{
if (!IsProcessed(message))
{
Process(message);
}
}

我想您将有某种方法来确定是否已经看到消息(例如,一个名为IsProcessed(message)的方法)。

最新更新