多服务器环境下的并行处理,Java



我有一个系统,它接收来自提供者的大量消息(通过JMS(。这些消息目前最多由4台服务器使用。提供者在同一毫秒内将其中一些消息加倍发送。。。有些消息是相互依赖的。例如,一条消息包含客户信息,另一条消息则包含他的产品。

目前,该软件部署在多达4台服务器上,可以监听队列并处理消息。处理意味着将数据映射并存储到我们的数据库中,该数据库是Oracle或PostGres数据库(取决于安装情况,但始终是关系数据库(。并发修改是通过乐观锁定检测到的,我们有一个通用的重试机制,可以在这些问题发生时修复这些问题。所以更新没有什么大麻烦。

但是插入是个问题。。。例如,我们在同一毫秒内收到两条客户消息…软件检查客户是否存在。。。如果不是,它会创建它……但如果同时发生这种情况,他们都会尝试创建客户。我们通过DB约束(customerNumber必须是唯一的(解决了这个问题,或者在更复杂的情况下(DB约束不合适(,通过对数据库表进行悲观锁定(在锁定表中生成了一个插入条目,另一个插入线程无法创建该条目,并且知道插入正在进行并停止处理(。到目前为止,这是有效的。。。我想这不是最好、最漂亮的解决方案,但它奏效了。

问题是我们的处理负载增加了,我们发现插入重复数据的问题越来越多。。。这会给我们的其他业务逻辑带来很多问题,必须加以预防。。。目前的方法是通过使用锁定数据库表来使用悲观锁定来保护这些方法。。。

目前我不确定我们是否朝着正确的方向前进,可能是我们的设计从一开始就错了。。。有人能为我提供这个问题的其他想法吗?可能有一些设计模式有优点也有缺点?

我们目前的替代方案是对消息进行分组和负载平衡(例如使用KAFKA(。这应该确保一个客户的消息由一个线程处理=>因此不可能同时插入。对此有何看法?

环境:Java(EJB(、Wildfly、Oracle/PostGre DB

其中一个选项是将任务分组,以便1台服务器处理100个请求,第二台服务器将处理接下来的200个请求。您甚至可以按客户名称对它们进行分组,这样两台服务器就不会同时与同一客户工作。

这可以在没有Kafka的情况下完成,只需使用普通的旧关系数据库。您只需要将任务插入数据库即可。然后,每个服务都可以像这样从表中进行SELECT(这是PG语法,Oracle可以通过小的更改来支持相同的语法(:

select * from tasks 
order by customer 
limit 100
for update skip locked

这允许:

  1. 成组批处理任务
  2. 按字段分组任务(如客户(
  3. 锁定进度记录(for update(
  4. 并允许其他服务跳过并进入下一批(skip locked(

但这确实需要一些延迟才能收集批次。另一方面,批处理通常更高效,因此您最终可能会更快地处理请求。

最新更新