如何从方法内部和外部中断同步订阅?



问题:我订阅了一个永无止境的消息传递服务,我的代码需要检查是否有消息满足条件,如果满足,则在处理所有消息并返回true之前关闭订阅。如果我已经处理了所有消息并且不满足条件,那么我需要关闭订阅并返回false。

例如:foo = 5:

message dataset early success :
msg1: foo=1
msg2: foo=2
msg3: foo=5 <= condition satisfied, return true and stop processing
msg4: foo=6
message dataset failure :
msg1: foo=1
msg2: foo=2
msg3: foo=3 
msg4: foo=4 <= no more messages, return false and stop processing

我使用的订阅有一个同步方法,我必须传递一个async EventHandler。以下是我的功能代码,适用于这两种情况,lastMessageReceivedDateTime跟踪最后收到消息的时间(以识别消息的结束),_conditionStatisfied告诉我是否已经获得了数据:

private DateTime lastMessageReceivedDateTime;
private bool _conditionSatisfied;
public Task<bool> CheckSubscription(IThirdParyCode connection)
{
var subscription = connection.Subscribe(async (obj, args) =>
{
lastMessageReceivedDateTime = DateTime.Now;
if(args.Message.foo == 5)
{
_conditionSatisfied = true;
}
});
while (lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now  && !_conditionSatisfied)
{
Thread.Sleep(500);
}
subscription?.Unsubscribe();
return _activityCheckSatisfied;
}

这是有效的,但我想知道是否有更好的解决方案。

注意:我不能简单地等待async方法,因为它永远不会返回/完成,直到我取消订阅。

更多信息:connection的类型为IStanConnection(来自NATS),Subscribe的签名为:

IStanSubscription Subscribe(string subject, StanSubscriptionOptions options,
EventHandler<StanMsgHandlerArgs> handler);

我简化了签名,把重点放在我有问题的代码上。

根据您的代码示例,我可以假设如果在最后一条消息的一秒钟内没有新消息,则消息流结束。可以修改您的解决方案以消除活动等待循环,并将其替换为单个await调用。它将基于两个任务:

  1. 第一个任务将跟踪成功完成(_conditionSatisfied在你的例子中),并将由TaskCompletionSource设置。SetResult
  2. 第二个任务将尝试使用CancellationToken任务包装器(这种包装器的示例实现)和CancellationTokenSource的组合来标记流的结束。CancelAfter将尝试在每次迭代后延迟取消任务。这应该取代lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now条件。

修改后的代码应该像这样:

private CancellationTokenSource streamEndCancellation = new CancellationTokenSource();
private TaskCompletionSource<bool> satisfiedCompletionSource = new TaskCompletionSource<bool>();
public async Task<bool> CheckSubscription(IThirdParyCode connection)
{
// CancellationTokenTaskSource is in third-party library and not part of .NET
var streamEndSource = new CancellationTokenTaskSource<bool>(streamEndCancellation.Token);
var subscription = connection.Subscribe(async (obj, args) =>
{
lastMessageReceivedDateTime = DateTime.Now;
if(args.Message.foo == 5)
{
satisfiedCompletionSource.SetResult(true);
}
streamEndCancellation.CancelAfter(1000);
});
Task<bool> actualTask = await Task.WhenAny<bool>(satisfiedCompletionSource.Task, streamEndSource.Task);

subscription?.Unsubscribe();
return !actualTask.IsCanceled;
}

最新更新