@Stateless和@Asynchronous EJB之间的JPA事务处理



我有一个无状态EJB,它将数据插入数据库,立即发送响应,并在最后一步调用异步EJB。异步EJB可以运行很长时间(我的意思是5-10分钟,比JPA事务超时时间长)。异步ejb需要读取(并处理)与无状态ejb持久化的记录树相同的记录树(仅读取)。

异步bean似乎试图在由状态EJB提交或插入(JPA)记录树之前读取记录树,因此异步bean看不到记录树。

无状态EJB:

@Stateless
public class ReceiverBean {
public void receiverOfIncomingRequest(data) {
long id = persistRequest(data);
sendResponseToJmsBasedOnIncomingData(data);
processorAsyncBean.calculate(id);
}
}
}

异步EJB:

@Stateless
public class ProcessorAsyncBean {
@Asynchronous
public void calculate(id) {
Data data = dao.getById(id); <- DATA IS ALLWAYS NULL HERE!
// the following method going to send
// data to external system via internet (TCP/IP)
Result result = doSomethingForLongWithData(data);
updateData(id, result);
}
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void updateData(id, result) {
dao.update(id, result);
}

也许我可以使用JMS队列向处理器bean发送带有ID的信号,而不是调用asyc-ejb(以及消息驱动的bean从数据库读取数据),但如果可能的话,我希望避免这种情况。

另一种解决方案可以是将整个记录树作为一个分离的JPA对象传递给处理器异步EJB,而不是从数据库中读回数据。

我能以某种方式使异步EJB在这种结构中正常工作吗?

-更新--

我在考虑使用Weblogic JMS。这里还有另一个问题。在负载大的情况下,当队列中有100000或更多的数据(这将是正常的),并且没有互联网连接时,队列中的所有数据都将失败。如果在通过互联网(通过doSomethingForLongWithData方法)发送数据期间出现该异常(或任何异常),则数据将根据Weblogic的redelivery-limitrepetitaion设置回滚到原始队列。此回滚事件将在托管服务器中的Weblogic上生成100000个或更多线程,以管理重新传递。大量新的后台进程可能会杀死服务器,或者至少会减慢服务器的速度。

我也可以使用IBM MQ,因为我们有MQ基础架构。MQ对Weblogic服务器没有这种影响,但MQ没有重新交付限制和延迟功能。因此,如果出现错误(回滚),消息将立即再次出现在MQ上,没有延迟,我构建了一个手动磨。catch条件下的Thread.sleep()不是EE应用中的解决方案,我想。。。

异步bean似乎在由状态EJB提交或插入(JPA)之前尝试读取记录树,因此异步bean看不到记录树。

这是bean管理事务的预期行为。您正在从具有自己事务上下文的EJB启动异步EJB。异步EJB从不使用调用方事务上下文(请参见EJB规范4.5.3)。只要您的持久性不使用事务隔离级别"读取未提交",就不会看到来自调用方的仍未提交的数据。

您必须考虑异步作业不会提交的情况(例如,应用程序服务器关闭或异常中止)。以下计算和更新是否至关重要?如果未成功执行或甚至未调用异步进程,异步进程是否可以恢复?

您可以考虑使用bean管理的事务,在调用异步EJB之前进行提交。或者,您可以将数据更新委托给具有新事务上下文的另一个EJB。这将在调用异步EJB之前提交。这通常适用于不挑剔的东西,缺失或失败。

使用持久性和事务性JMS消息以及死信队列具有可靠处理计算和更新的优势,即使在其间停止/启动应用程序服务器,或者在处理过程中出现临时错误。

您只需要调用带有事务标记的方法旁边的异步方法,这样在提交事务时就可以了。

例如,receiverOfIncomingRequest()方法的调用者可以添加

processorAsyncBean.calculate(id);

打电话到旁边。

更新:扩展示例

呼叫者MDB

@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public void onMessage(Message message) {
long id = receiverBean.receiverOfIncomingRequest(data);
processorAsyncBean.calculate(id);
}

ReceiverBean

@TransactionAttribute(TransactionAttributeType.REQUIRED)
public long receiverOfIncomingRequest(data) {
long id = persistRequest(data);
sendResponseToJmsBasedOnIncomingData(data);
return id;        
}

最新更新