使用任务执行器的事务轮询器将数据库资源用于排队的消息



我有一个服务激活器,它使用轮询器从通道中提取消息。通道有一个队列,该队列由数据库的持久存储支持。轮询器还配置了一个任务执行器,以便为来自通道的消息处理添加一些并发性。

任务执行器配置有队列容量。

由于轮询器从数据库的通道中检索消息,并且这被配置为事务性的,那么如果任务执行器没有更多可用的线程,那么在任务执行器中排队的消息的事务会发生什么。任务执行器上对线程的请求是排队的,由于这些消息没有自己的线程,事务会发生什么?我假设轮询器从持久通道存储中删除由任务执行器排队的消息将被提交。那么,如果服务器在任务执行器中排队的可运行程序出现故障,它们将丢失?

由于事务性持久通道队列的理念是确保在服务器宕机的情况下不会丢失消息,因此如何处理排队消息(在任务执行器中)在通道数据库支持的队列/存储中的活动事务?

<bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="channelServerDataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
<property name="region" value="${user.name}_${channel.queue.region:default}"/>
<property name="usingIdCache" value="false"/>
</bean> 
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> 
</int:transaction-synchronization-factory>
<int:channel id="transacitonAsyncServiceQueue">
<int:queue message-store="store"/> 
<!--  <int:queue/>  --> 
</int:channel>
<bean id="rxPollingTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="500"/>
<constructor-arg value="MILLISECONDS"/>
<property name = "initialDelay" value = "30000"/> 
<!-- initialDelay important to ensure channel doesnt start processing before the datasources have been initialised becuase we
now persist transactions in the queue, at startup (restart) there maybe some ready to go which get processed before the
connection pools have been created which happens when the servlet is first hit -->
</bean> 
<int:service-activator ref="asyncChannelReceiver" method="processMessage" input-channel="transacitonAsyncServiceQueue">
<int:poller trigger="rxPollingTrigger" max-messages-per-poll="20"  task-executor="taskExecutor" receive-timeout="400">
<int:transactional transaction-manager="transactionManagerAsyncChannel" /> 
</int:poller>
<int:request-handler-advice-chain>
<ref bean="databaseSessionContext" />
</int:request-handler-advice-chain>
</int:service-activator>
<task:executor id="taskExecutor" pool-size="100-200" queue-capacity="200" keep-alive="1" rejection-policy="CALLER_RUNS" />  

。。。那么,如果任务执行器没有更多可用的线程,那么在任务执行器中排队的消息的事务会发生什么。任务执行器上对线程的请求是排队的,由于这些消息没有自己的线程,事务会发生什么?

它不是这样工作的——直到执行任务(包括通道中的receive()),事务才会启动。

轮询器安排任务、任务启动、事务启动、工作进行、事务提交/回滚。

请参阅AbstractPollingEndpoint-pollingTask.call()是事务开始的地方。

最新更新