公共交通发行商赢得了对队列的自定义绑定



我想将消息发布到不是公共交通库的客户端,我想为我的实体自定义交换点的命名,并将该交换点与队列进行自定义连接,告诉我如何做到这一点?

我试了一下,但不起作用

启动

public void ConfigureServices(IServiceCollection services)
{
services.AddMassTransit(busConfig =>
{
busConfig.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", 5672, "/", configurator =>
{
configurator.Username("guest");
configurator.Password("guest");
});

cfg.UsePrometheusMetrics(serviceName: "publisher_service");
cfg.ConfigurePublish(x =>
{
x.UseExecute(publishContext =>
{
publishContext.Serializer =
new RawJsonMessageSerializer(RawJsonSerializerOptions.AnyMessageType);
publishContext.ContentType.MediaType = "application/json";
});
});
cfg.Message<Message>(configurator => configurator.SetEntityName("mass_transit_exchange"));

cfg.Publish<Message>(configurator =>
{   
configurator.BindQueue("mass_transit_exchange", "testQUEUE", bindingConfigurator =>
{
bindingConfigurator.ExchangeType = ExchangeType.Direct;
bindingConfigurator.RoutingKey = "key";
});
});
});
});
services.AddMassTransitHostedService();
services.AddControllers();
}

控制器方法

[HttpPost]
public async Task<ActionResult> SentMessage([FromBody] Message message, CancellationToken cancellationToken)
{
message.CreatedOn = DateTimeOffset.Now;
await _endpoint.Publish<Message>(message,
context =>
{
context.Durable = true;
var a = context.Headers.GetAll();
foreach (var keyValuePair in a)
{
context.Headers.Set(keyValuePair.Key, null);
}
context.MessageId = null;
context.TimeToLive = TimeSpan.FromMilliseconds(30000);
},
cancellationToken);
_logger.LogInformation($"message was sent: {JsonConvert.SerializeObject(message, Formatting.Indented)}");
return Ok();
}

我想在publisher中创建这个拓扑拓扑

错误

MassTransit.RabbitMqTransport.RabbitMqAddressException: The entity name must be a sequence of these characters: letters, digits, hyphen, underscore, period, or colon.
at MassTransit.RabbitMqTransport.Topology.RabbitMqEntityNameValidator.ThrowIfInvalidEntityName(String name)
at MassTransit.RabbitMqTransport.RabbitMqEndpointAddress..ctor(Uri hostAddress, Uri address)
at MassTransit.RabbitMqTransport.Integration.ConnectionContextSupervisor.NormalizeAddress(Uri address)
at MassTransit.RabbitMqTransport.Transport.RabbitMqSendTransportProvider.NormalizeAddress(Uri address)
at MassTransit.Transports.SendEndpointProvider.GetSendEndpoint(Uri address)
at MassTransit.Transports.ReceiveEndpoint.GetSendEndpoint(Uri address)
at MassTransit.MassTransitBus.MassTransit.ISendEndpointProvider.GetSendEndpoint(Uri address)
at MassTransit.Scoping.ScopedSendEndpointProvider`1.MassTransit.ISendEndpointProvider.GetSendEndpoint(Uri address)
at publisherMassTransit.Controllers.ApiController.SentMessage(Message message, CancellationToken cancellationToken) in C:UsersqualitaDesktopexamplespublisherMassTransitControllersApiController.cs:line 32
at lambda_method(Closure , Object )
at Microsoft.Extensions.Internal.ObjectMethodExecutorAwaitable.Awaiter.GetResult()
at Microsoft.AspNetCore.Mvc.Infrastructure.ActionMethodExecutor.TaskOfActionResultExecutor.Execute(IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeActionMethodAsync>g__Awaited|12_0(ControllerActionInvoker invoker, ValueTask`1 actionResultValueTask)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeNextActionFilterAsync>g__Awaited|10_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Rethrow(ActionExecutedContextSealed context)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.InvokeInnerFilterAsync()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeFilterPipelineAsync>g__Awaited|19_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Awaited|17_0(ResourceInvoker invoker, Task task, IDisposable scope)
at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
at Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware.Invoke(HttpContext context)
HEADERS
=======
Connection: keep-alive
Content-Type: application/json
Accept: */*
Accept-Encoding: gzip, deflate, br
Host: localhost:5000
User-Agent: PostmanRuntime/7.28.4
Content-Length: 29
Postman-Token: f93cd39a-0757-4929-9c1a-4bc63393f1cf

您可以使用原始JSON序列化程序,只需要为发布的消息配置交换类型。

public void ConfigureServices(IServiceCollection services)
{
services.AddMassTransit(busConfig =>
{
busConfig.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", 5672, "/", configurator =>
{
configurator.Username("guest");
configurator.Password("guest");
});
cfg.UseRawJsonSerializer();

cfg.UsePrometheusMetrics(serviceName: "publisher_service");
});
});
services.AddMassTransitHostedService();
}

与其把所有令人困惑的发布拓扑连接起来,只需发送以下消息:

public class Controller 
{
readonly ISendEndpointProvider _sendEndpointProvider;
public Controller(ISendEndpointProvider sendEndpointProvider)
{
_sendEndpointProvider = sendEndpointProvider;
}
[HttpPost]
public async Task<ActionResult> SentMessage([FromBody] Message message, CancellationToken cancellationToken)
{
message.CreatedOn = DateTimeOffset.Now;
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri('queue:name&type=direct'));
await endpoint.Send<Message>(message,
context =>
{
var a = context.Headers.GetAll();
foreach (var keyValuePair in a)
{
context.Headers.Set(keyValuePair.Key, null);
}
context.MessageId = null;
context.TimeToLive = TimeSpan.FromMilliseconds(30000);
},
cancellationToken);
_logger.LogInformation($"message was sent: {JsonConvert.SerializeObject(message, Formatting.Indented)}");
return Ok();
}
}

或者类似的东西。您可以根据需要指定其他查询字符串参数来调整交换类型。

最新更新