问题:我订阅了一个永无止境的消息传递服务,我的代码需要检查是否有消息满足条件,如果满足,则在处理所有消息并返回true之前关闭订阅。如果我已经处理了所有消息并且不满足条件,那么我需要关闭订阅并返回false。
例如:foo = 5
:
message dataset early success :
msg1: foo=1
msg2: foo=2
msg3: foo=5 <= condition satisfied, return true and stop processing
msg4: foo=6
message dataset failure :
msg1: foo=1
msg2: foo=2
msg3: foo=3
msg4: foo=4 <= no more messages, return false and stop processing
我使用的订阅有一个同步方法,我必须传递一个async EventHandler
。以下是我的功能代码,适用于这两种情况,lastMessageReceivedDateTime
跟踪最后收到消息的时间(以识别消息的结束),_conditionStatisfied
告诉我是否已经获得了数据:
private DateTime lastMessageReceivedDateTime;
private bool _conditionSatisfied;
public Task<bool> CheckSubscription(IThirdParyCode connection)
{
var subscription = connection.Subscribe(async (obj, args) =>
{
lastMessageReceivedDateTime = DateTime.Now;
if(args.Message.foo == 5)
{
_conditionSatisfied = true;
}
});
while (lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now && !_conditionSatisfied)
{
Thread.Sleep(500);
}
subscription?.Unsubscribe();
return _activityCheckSatisfied;
}
这是有效的,但我想知道是否有更好的解决方案。
注意:我不能简单地等待async方法,因为它永远不会返回/完成,直到我取消订阅。
更多信息:connection
的类型为IStanConnection
(来自NATS),Subscribe
的签名为:
IStanSubscription Subscribe(string subject, StanSubscriptionOptions options,
EventHandler<StanMsgHandlerArgs> handler);
我简化了签名,把重点放在我有问题的代码上。
根据您的代码示例,我可以假设如果在最后一条消息的一秒钟内没有新消息,则消息流结束。可以修改您的解决方案以消除活动等待循环,并将其替换为单个await
调用。它将基于两个任务:
- 第一个任务将跟踪成功完成(
_conditionSatisfied
在你的例子中),并将由TaskCompletionSource设置。SetResult - 第二个任务将尝试使用CancellationToken任务包装器(这种包装器的示例实现)和CancellationTokenSource的组合来标记流的结束。CancelAfter将尝试在每次迭代后延迟取消任务。这应该取代
lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now
条件。
修改后的代码应该像这样:
private CancellationTokenSource streamEndCancellation = new CancellationTokenSource();
private TaskCompletionSource<bool> satisfiedCompletionSource = new TaskCompletionSource<bool>();
public async Task<bool> CheckSubscription(IThirdParyCode connection)
{
// CancellationTokenTaskSource is in third-party library and not part of .NET
var streamEndSource = new CancellationTokenTaskSource<bool>(streamEndCancellation.Token);
var subscription = connection.Subscribe(async (obj, args) =>
{
lastMessageReceivedDateTime = DateTime.Now;
if(args.Message.foo == 5)
{
satisfiedCompletionSource.SetResult(true);
}
streamEndCancellation.CancelAfter(1000);
});
Task<bool> actualTask = await Task.WhenAny<bool>(satisfiedCompletionSource.Task, streamEndSource.Task);
subscription?.Unsubscribe();
return !actualTask.IsCanceled;
}