我们使用Rebus作为Sql server的队列系统。对于不同类型的消息,我们有几个收件人。每个消息可以由特定类型的几个工作者处理。一条消息应该只由一个worker处理/处理(第一个提取它的worker)。如果一个worker由于某种原因不能完成它,它会使用超时服务来延迟消息。
如果我理解正确的话,它会变成一个TimeoutRequest并放入超时表。当需要重新运行时,它会在作为原始消息重新引入队列之前变成TimeoutReply。
我们遇到的问题是,当它变成TimeoutReply时,所有的工作线程都将其拾取并创建原始消息。当超时时,一条原始消息将变成多条消息(与工作线程的数量一样多)。
我们的Rebus设置如下:
"服务器端":
var adapter = new BuiltinContainerAdapter();
Configure.With(adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServerInOneWayClientMode(connectionString).EnsureTableIsCreated())
.CreateBus()
.Start();
return adapter;
"工人":
_adapter = new BuiltinContainerAdapter();
Configure.With(_adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServer(_connectionString, _inputQueue, "error")
.EnsureTableIsCreated())
.Events(x => x.AfterMessage += ((bus, exception, message) => SendWorkerFinishedJob(exception, message)))
.Events(x => x.BeforeMessage += (bus, message) => SignalWorkerStartedJob(message))
.Behavior(x => x.SetMaxRetriesFor<Exception>(0))
.Timeouts(x => x.StoreInSqlServer(_connectionString, "timeouts").EnsureTableIsCreated())
.CreateBus().Start(numberOfWorkers);
任何帮助解决问题或提供理解是非常感激的!
我能想到的唯一原因是为什么你最终会有多个超时回复,因为每个worker都作为超时管理器,并且它们似乎共享相同的存储。
这样,由于超时管理器在查询到期超时时不使用任何类型的锁定或任何东西,因此它们最终可以抓取相同的到期超时,这反过来导致多个超时应答(如果存在竞争条件,但它不会被注意到,因为此SQL不会注意到一行是否实际被删除)。
我建议你要么a)为工人(例如_inputQueue + ".timeouts"
)使用单独的超时表,要么b)让所有工人使用外部超时管理器(即通过省略Timeouts(x => ...)
的东西并启动一个独立的专用超时管理器)。
在你的情况下,我想(a)是最简单的方法,因为它非常接近你现在的情况。
我自己更喜欢(b),通常每台托管Rebus端点的机器上有一个超时管理器。
请让我知道这是否解决了你的问题。
另外,我很想知道SQL传输是如何为您工作的:)