Akka.Net 流 - 当缓冲区引发错误时,源停止拉取元素



我一直在玩流扩展包 Akka.Net,并在尝试组合缓冲区和限制方法时注意到此错误:

using (var system = ActorSystem.Create("test-system"))
using (var materializer = system.Materializer(GetSettings(system)))
{
int index = 0;
var sink = Sink.ActorRefWithAck<KeyValue>(
system.ActorOf<Writer>(), 
new OnInitMessage(), 
new OnAcknowledgeMessage(), 
OnComplete.Instance, 
exception => new OnError(exception));
ServiceBusSource
.Create(client, message =>
{
var json = new StreamReader(message.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
var result = JsonConvert.DeserializeObject<KeyValue>(json);
message.Complete();
return result;
})
.WithLogger(system, entity => $"{entity.Key} => {entity.Value}")
.Buffer(1, OverflowStrategy.Fail)
.Throttle(1, TimeSpan.FromSeconds(5), 3, ThrottleMode.Shaping)
.ToMaterialized(sink, Keep.Right)
.Run(materializer);
Console.ReadLine();
}

我正在使用Alpakka的ServiceBusSource 这些是我引用的软件包:

  • Akka.Streams: 1.3.1
  • Akka.Streams.Azure.ServiceBus: 0.1.0
  • WindowsAzure.ServiceBus: 4.1.3

故意让它失败,以便查看 BUT 的行为方式,缓冲区的策略失败后,流完成并且没有更多元素被拉取。

键值.cs

public class KeyValue
{
public int Id { get; set; }
public string Key { get; set; }
public string Value { get; set; }
public DateTime Produced { get; set; }
public DateTime Emitted { get; set; }
public override string ToString()
{
return $"[{Produced}] - [{Emitted}] => {Id} {Key}:{Value}";
}
}

获取设置方法:

ActorMaterializerSettings GetSettings(ActorSystem system)
{
return ActorMaterializerSettings.Create(system)
.WithSupervisionStrategy(cause =>
{
system.Log.Error(cause, "Failed");
return Directive.Resume;
});
}

有几种方法可以在流中处理错误 - 其中大多数在文档中进行了描述:

  1. 使用Recover从错误创建回退事件。
  2. 使用RecoverWithRetries允许在出错时重定向到不同的流。
  3. 使用Restart.WithBackoff在指数退避延迟后重建重试流。
  4. 使用WithSupervisionStrategy- 这是一个非常有限的选项,因为它仅适用于明确引用它的阶段(如文档中所述(。

您的情况是设计使然 - 当您使用OverflowStrategy.Fail时,这意味着一旦达到溢出,就会产生错误。大多数 akka 阶段的反应是在发生故障时立即关闭流。

最新更新