带有AWS SQS的MassTransit-发送动态消息时出现异常



我使用带有AWS SQS/SNS的Masstransit作为传输。现在我遇到了一个简单的问题——将CorrelationId添加到已发布的消息中。由于我遵循将接口用作DTO的建议,所以我不能使用CorrelatedBy<Guid>接口,所以我决定直接将__CorrelationId属性添加到消息中。

我正在使用以下包装器发布消息:

public class MessageBus : IMessageBus
{
// ... omitted for brevity
public async Task Publish<T>(object message)
where T : class
{
await this.publishEndpoint.Publish<T>(this.CreateCorrelatedMessage(message));
}
private object CreateCorrelatedMessage(object message)
{
dynamic corellatedMessage = new ExpandoObject();
foreach (var property in message.GetType().GetProperties())
{
((IDictionary<string, object>)corellatedMessage).Add(property.Name, property.GetValue(message));
}
corellatedMessage.__CorrelationId = this.correlationIdProvider.CorrelationId;
return corellatedMessage;
}
}

这里是用法:

await this.messageBus.Publish<ObtainRequestToken>(new
{
Social = SocialEnum.Twitter,
AppId = credentials.First(x => x.Name == "TwitterConsumerKey").Value,
AppSecret = credentials.First(x => x.Name == "TwitterConsumerSecret").Value,
ReturnUrl = returnUrl
});

这在控制台应用程序中运行良好,具有以下总线注册:

var busControl = Bus.Factory.CreateUsingAmazonSqs(cfg =>
{
cfg.Host("eu-west-1", h =>
{
h.AccessKey("xxx");
h.SecretKey("xxx");
h.Scope($"Local", scopeTopics: true);
});
});

但是是ASP。NET核心应用程序与

services.AddMassTransit(cfg =>
{
cfg.UsingAmazonSqs((context, sqsCfg) =>
{
var amazonAccount = this.Configuration
.GetSection("AmazonAccount")
.Get<AmazonAccountConfig>();
sqsCfg.Host(amazonAccount.Region, h =>
{
h.AccessKey(amazonAccount.KeyId);
h.SecretKey(amazonAccount.SecretKey);
h.Scope(this.Environment.EnvironmentName, scopeTopics: true);
});
});
cfg.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(this.Environment.EnvironmentName + "_", false));
});
services.AddMassTransitHostedService();

使用Object reference not set to an instance of an object.和堆叠发布时失败

at MassTransit.Logging.EnabledDiagnosticSource.StartSendActivity[T](SendContext1 context, ValueTuple2[] tags)
at MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe1.<Send>d__5.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at GreenPipes.Agents.PipeContextSupervisor1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at GreenPipes.Agents.PipeContextSupervisor1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at GreenPipes.Agents.PipeContextSupervisor1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at MassTransit.Initializers.MessageInitializer2.<Send>d__9.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at MassTransit.Transports.PublishEndpoint.<>c__DisplayClass18_01.<<PublishInternal>g__PublishAsync|0>d.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Dynamic.UpdateDelegates.UpdateAndExecuteVoid1[T0](CallSite site, T0 arg0)
at xxx.Main.Api.Messaging.MessageBus.<Publish>d__51.MoveNext() in D:repoprojectsxxxmainapixxx.Main.ApiMessagingMessageBus.cs:line 34 

StartSendActivity看起来像一种简单的诊断方法,我无法摆脱,可能会出现什么故障。

您不能使用MassTransit发送object或类似的其他类型。消息必须是引用类型。

相关文件

最新更新