缓解来自并发消息系统的DB WRITE争用



请注意:尽管我在这里的问题特别涉及Camel(2.11.0)和ActiveMQ(5.8.0),但它实际上是关于并发消息传递解决方案的正确设计,任何有强大消息传递和/或并发经验的人都可以回答。

我有一个Camel路由,它从ActiveMQ队列(myQueue)中读取消息,并将它们发送到bean(processorBean)进行处理:

<camelContext id="my-camel-context" xmlns="http://camel.apache.org/schema/spring">
<endpoint id="myQueue" uri="myBroker01:queue:myQueue" />
<route id="my-route">
<from ref="myQueue" />
<to uri="bean:processorBean?method=process" /> 
</route>
</camelContext>

和:

public class ProcessorBean {
public void process(Exchange exchange) {
String messageJSON = (String)exchange.getIn().getBody();
// Example: now messageID might be "12345"
String messageID = parseJSON(messageJSON, "messageID");
// Look up DB records based on this messageID.
// The same messageID will *always* return the same list of widgets.
List<Widget> widgets = dao.getWidgetsByMessageID(messageID);
// Make updates to widgets.
for(Widget widget : widgets) {
widget.setFizz(true);
widget.setBuzz("Yahtzee!!!");
}
// Persist all updates to the widget list.
dao.updateAll(widgets);
}
}

这个bean使用消息的ID(StringmessageID字段)在DB中查找一堆记录,对它们进行更改,然后保存它们。注意消息从我无法控制的外部进程到达myQueue也是非常重要的。换句话说,我无法阻止具有相同messageID值的消息(以下简称"重复"或重复消息)出现在线程上。因此,这个外部进程可以在myQueue上发送1000条消息,其中20条消息都可以具有messageID=12345

目前,我只有1个Camel消费者配置为正在运行(因此它是"单线程"的)。因此,当重复出现时,目前没有任何危害(除了可能出现不必要的性能问题)。每个消息都会被处理,一次处理一个,如果有20个消息具有相同的messageID,那么,相同的DB记录会一次又一次地得到相同的(不必要的)更新。当然,这对性能不利,但它不会产生"坏数据"、脏写入、我们的产品竞争条件等。

我现在想在等式中添加更多的Camel消费者线程,这样可能会有10个消费者线程全部读取myQueue

显然,现在DB中存在WRITE争用的可能性。假设myQueue上有两条消息,并且都有messageID=12345。一个Camel使用者线程读取第一条消息,另一个线程同时或大约读取第二条消息。每个线程将其消息路由到其自己的processorBean的副本/版本。两个processorBean实例大约在同一时间执行,使用messageID从DB中读取相同的记录,在内存中对它们执行相同的操作,然后调用dao.updateAll(...)同时写入对相同记录的更改。如果两个线程同时更新相同的DB记录,就会出现争用。

另一个重要的注意事项是,在这种情况下,更改DB(由另一个团队控制)以实现分片、乐观锁定等不是一个选项(背景故事太长)。

我的问题是:在这种情况下,Java层可以做些什么来缓解WRITE争用必须从应用程序内部处理WRITE争用。想法?

您可以使用类似的全局锁

synchronized(ProcessorBean.class) {
// Persist all updates to the widget list.
dao.updateAll(widgets);
}

最新更新