刚刚从MassTransit 6.3.2升级到7.2.4。我正在使用.Net Core 5。
以下单元测试在升级前运行良好,但在升级后失败。
using var harness = new InMemoryTestHarness();
harness.Consumer(() => new MockedxxxService(), xxxEndPoint);
harness.Start().Wait();
IBus endpoint = harness.Bus;
var tasks = new System.Collections.Concurrent.ConcurrentBag<Task<string>>();
var result = Parallel.For(0, 100, index =>
{
var sut = new xxxRetriever(..., endpoint, ...);
tasks.Add(sut.Getxxx(paramx));
});
Assert.True(result.IsCompleted);
await Task.WhenAll(tasks);
xxxRetriever类中的代码如下所示。
public async Task<string> Getxxx(string paramx)
{
try
{
var serviceAddress = new Uri("queue:" + ...);
var xxxService = _busService.CreateRequestClient<xxxContract>(serviceAddress, TimeSpan.FromMilliseconds(xxx));
var xxxResponse = await xxxService.GetResponse<xxxResultContract>(new
{
...
}).ConfigureAwait(false);
....
}
}
端点作为IBus注入到类中。
被嘲笑的服务是这样的。
public class MockedxxxService : IConsumer<xxxContract>
{
public async Task Consume(ConsumeContext<xxxContract> context)
{
await context.RespondAsync<xxxResultContract>(new { ... } } });
}
}
当我们将任务数量限制在30个左右时,测试运行良好。但除此之外,它与消息";等待响应超时,RequestId:&";。
如有任何帮助,我们将不胜感激。
这可能与TPL中的争用有关,但这只是猜测。您可以配置增加总线上的并发限制,看看这是否有帮助:
harness.OnConfigureInMemoryBus += c =>
{
c.Host(h => h.TransportConcurrencyLimit = 100);
};
这是一个猜测,但它可能是相关的,因为它是特定于负载的。
显然,这应该在对硬度调用Start
之前调用(顺便说一下,应该是await
ed,而不是使用.Wait()
(。
经过大量调查,我们发现这是由.Net 5处理线程创建的方式引起的。请参阅ThreadPool.SetMinThreads不创建任何新线程
我们在测试之前启动了线程池,从而解决了这个问题。不确定为什么这在MassTransit 6下的.Net 5中正确工作,但现在它正在工作。