具有 Azure 存储队列的 NServiceBus 将无限期地处理没有标头的消息



我正在试验一个新的NServiceBus项目,该项目使用Azure存储队列进行消息传输和JSON序列化。我注意到,当我通过缺少 NServiceBus 标头的队列运行消息时,例如空的 JSON 消息:{ } 它将抛出以下警告消息:

2020-02-06 17:46:35.587 WARN  NServiceBus.Transport.AzureStorageQueues.MessagePump Azure Storage Queue transport failed pushing a message through pipeline
System.ArgumentNullException: Value cannot be null.
Parameter name: nativeMessageId
at NServiceBus.Transport.IncomingMessage..ctor(String nativeMessageId, Dictionary`2 headers, Byte[] body)
at NServiceBus.Transport.ErrorContext..ctor(Exception exception, Dictionary`2 headers, String transportMessageId, Byte[] body, TransportTransaction transportTransaction, Int32 immediateProcessingFailures)
at NServiceBus.Transport.AzureStorageQueues.ReceiveStrategy.CreateErrorContext(MessageRetrieved retrieved, MessageWrapper message, Exception ex, Byte[] body)
at NServiceBus.Transport.AzureStorageQueues.AtLeastOnceReceiveStrategy.<Receive>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.AzureStorageQueues.MessagePump.<InnerReceive>d__7.MoveNext()

在此之后,它似乎停止处理消息,但将其保留在队列中。然后,在等待配置的消息不可见期后,该消息再次在队列中可见,NServiceBus 将无限期地重复"警告并停止处理"过程。有没有办法改变 NServiceBus 处理此方案的方式,以便在它无法解析标头信息并且不尝试无限期处理消息时将消息抛到配置的错误队列?

NServiceBus Storage Queues 传输假设消息以正确的信封到达。如果未找到该信封,您将获得上面看到的异常。对于不是由 NServiceBus 构造或使用自定义信封构造的消息,请参阅此处的文档。简而言之,您需要一个自定义信封展开器。

自定义 unrapper(回调(负责的是反序列化消息并构造 NServiceBus 期望使用的MessageWrapper

var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();
transport.UnwrapMessagesWith(cloudQueueMessage =>
{
using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
using (var streamReader = new StreamReader(stream))
using (var textReader = new JsonTextReader(streamReader))
{
//try deserialize to a NServiceBus envelope first
var wrapper = jsonSerializer.Deserialize<MessageWrapper>(textReader);
if (wrapper.Id != null)
{
//this was a envelope message
return wrapper;
}
//this was a native message just return the body as is with no headers
return new MessageWrapper
{
Id = cloudQueueMessage.Id,
Headers = new Dictionary<string, string>(),
Body = cloudQueueMessage.AsBytes
};
}
});

最新更新