我正在尝试使用分区在多线程上运行JAVAEE7批处理。
我的批处理很简单:读取一堆随机数,使用 3 个线程写出它们的总和。
我的工作 XML
<job id="partition" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
version="1.0">
<step id="process" next="cleanup">
<chunk item-count="3">
<reader ref="partitionProcessIR">
<properties>
<property name="start" value="#{partitionPlan['start']}" />
<property name="end" value="#{partitionPlan['end']}" />
</properties>
</reader>
<processor ref="partitionProcessIP" />
<writer ref="partitionProcessIW" />
</chunk>
<partition>
<mapper ref="partitionMapperImpl" />
</partition>
</step>
<step id="cleanup">
<batchlet ref="partitionCleanupBatchlet"></batchlet>
</step>
</job>
My PartitionMapperImpl:
@Override
public PartitionPlan mapPartitions() throws Exception {
// TODO Auto-generated method stub
return new PartitionPlanImpl() {
@Override
public int getPartitions() {
return 3;
}
@Override
public int getThreads() {
return 3;
}
@Override
public Properties[] getPartitionProperties() {
int totalRecords = getTotalRecords();
int partItems = totalRecords / getPartitions();
int remainItems = totalRecords % getPartitions();
Properties[] props = new Properties[getPartitions()];
for (int i = 0; i < getPartitions(); i++) {
props[i] = new Properties();
props[i].setProperty("start", String.valueOf(i * partItems));
// if this is the last partition, add remaining items
if (i == getPartitions() - 1) {
props[i].setProperty("end", String.valueOf((i + 1) * partItems + remainItems));
} else {
props[i].setProperty("end", String.valueOf((i + 1) * partItems));
}
}
return props;
}
};
}
private int getTotalRecords() {
return 50;
}
我的读者:
@Override
public void open(Serializable checkpoint) throws Exception {
int start = new Integer(startProperty);
int end = new Integer(endProperty);
List<Integer> listNumber = new ArrayList<>();
for (int i = start; i < end; i++) {
int rand = (int) (Math.random() * 10);
listNumber.add(rand);
}
iterator = listNumber.iterator();
}
@Override
public Integer readItem() throws Exception {
if (iterator.hasNext()) {
return iterator.next();
}
// end read
return null;
}
我的处理器
@Override
public Integer processItem(Object arg0) throws Exception {
Integer rand = (Integer) arg0;
return rand;
}
我的作家
@Override
public void writeItems(List<Object> arg0) throws Exception {
int sum = 0;
for (Object object : arg0) {
Integer rand = (Integer) object;
sum += rand;
}
System.out.println(Thread.currentThread().getId() + " | SUM OF CHUNK: " + sum);
}
运行此批处理时,发生以下错误。 我猜这与在 derby 数据库中同时存储服务器检查点有关。
2017-03-02T15:22:45.955+0700|情報: 275 |块的总和:13 2017-03-02T15:22:45.958+0700|情報: 316 |块的总和:17 2017-03-02T15:23:05.971+0700|重大:读写循环失败 com.ibm.jbatch.container.exception.BatchContainerServiceException: 无法将 [进程] 的检查点数据保留在 com.ibm.jbatch.container.persistence.CheckpointManager.checkpoint(CheckpointManager.java:133) 在 com.ibm.jbatch.container.impl.ChunkStepControllerImpl.invokeChunk(ChunkStepControllerImpl.java:644) 在 com.ibm.jbatch.container.impl.ChunkStepControllerImpl.invokeCoreStep(ChunkStepControllerImpl.java:764) 在 com.ibm.jbatch.container.impl.BaseStepControllerImpl.execute(BaseStepControllerImpl.java:144) 在 com.ibm.jbatch.container.impl.ExecutionTransitioner.doExecutionLoop(ExecutionTransitioner.java:112) 在 com.ibm.jbatch.container.impl.JobThreadRootControllerImpl.originateExecutionOnThread(JobThreadRootControllerImpl.java:110) 在 com.ibm.jbatch.container.util.BatchWorkUnit.run(BatchWorkUnit.java:80) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.glassfish.enterprise.concurrent.internal.ManagedFutureTask.run(ManagedFutureTask.java:141) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl$ManagedThread.run(ManagedThreadFactoryImpl.java:250) 由以下原因引起:com.ibm.jbatch.container.exception.PersistenceException: java.sql.SQLTransactionRollbackException: ????????????????????????????????????:锁定:行,检查点数据, (110,27) 正在等待 XID : {77885156, S} , APP, 选择 id, obj from 检查点数据,其中 id = ?授予 XID : {77885155, X} 锁定 : 行, 检查点数据, (110,28) 等待 XID : {77885155, S} , APP, 选择 id, obj from CHECKPOINTDATA 其中 id = ?授予 XID : {77885156, X} ????????XID: 77885156?在 fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData(JBatchJDBCPersistenceManager.java:503) 在 fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.updateCheckpointData(JBatchJDBCPersistenceManager.java:388) 在 fish.payara.jbatch.persistence.rdbms.LazyBootPersistenceManager.updateCheckpointData(LazyBootPersistenceManager.java:230) 在 com.ibm.jbatch.container.persistence.CheckpointManager.checkpoint(CheckpointManager.java:128) ...13 更多 原因: java.sql.SQLTransactionRollbackException: ????????????????????????????????????:锁定:行,检查点数据, (110,27) 正在等待 XID : {77885156, S} , APP, 选择 id, obj from 检查点数据,其中 id = ?授予 XID : {77885155, X} 锁定 : 行, 检查点数据, (110,28) 等待 XID : {77885155, S} , APP, 选择 id, obj from CHECKPOINTDATA 其中 id = ?授予 XID : {77885156, X} ????????XID: 77885156?在 org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown 来源)在 org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source) 在 org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown 来源)在 org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown 来源)在 org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown 来源)在 org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown 来源)在 org.apache.derby.impl.jdbc.EmbedResultSet.closeOnTransactionError(Unknown 来源)在 org.apache.derby.impl.jdbc.EmbedResultSet.movePosition(Unknown Source) at org.apache.derby.impl.jdbc.EmbedResultSet.next(Unknown Source) at com.sun.gjc.spi.base.ResultSetWrapper.next(ResultSetWrapper.java:103) 在 fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData(JBatchJDBCPersistenceManager.java:498) ...16 更多 原因: java.sql.SQLException: ????????????????????????????????????:锁定:行,检查点数据, (110,27) 正在等待 XID : {77885156, S} , APP, 选择 id, obj from 检查点数据,其中 id = ?授予 XID : {77885155, X} 锁定 : 行, 检查点数据, (110,28) 等待 XID : {77885155, S} , APP, 选择 id, obj from CHECKPOINTDATA 其中 id = ?授予 XID : {77885156, X} ????????XID: 77885156?在 org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown 来源)在org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown 来源)...27 更多 由以下原因引起: 错误 40001: ????????????????????????????????????:锁定:行,检查点数据, (110,27) 正在等待 XID : {77885156, S} , APP, 选择 id, obj from 检查点数据,其中 id = ?授予 XID : {77885155, X} 锁定 : 行, 检查点数据, (110,28) 等待 XID : {77885155, S} , APP, 选择 id, obj from CHECKPOINTDATA 其中 id = ?授予 XID : {77885156, X} ????????XID: 77885156?在 org.apache.derby.iapi.error.StandardException.newException(Unknown 来源)在 org.apache.derby.impl.services.locks.Deadlock.buildException(Unknown 来源)在 org.apache.derby.impl.services.locks.ConcurrentLockSet.lockObject(Unknown 来源)在 org.apache.derby.impl.services.locks.ConcurrentLockSet.zeroDurationLockObject(Unknown 来源)在 org.apache.derby.impl.services.locks.AbstractPool.zeroDurationlockObject(Unknown 来源)在 org.apache.derby.impl.services.locks.ConcurrentPool.zeroDurationlockObject(Unknown 来源)在 org.apache.derby.impl.store.raw.xact.RowLocking2nohold.lockRecordForRead(Unknown 来源)在 org.apache.derby.impl.store.access.conglomerate.OpenConglomerate.lockPositionForRead(Unknown 来源)在 org.apache.derby.impl.store.access.conglomerate.GenericScanController.fetchRows(Unknown 来源)在 org.apache.derby.impl.store.access.heap.HeapScan.fetchNextGroup(Unknown 来源)在 org.apache.derby.impl.sql.execute.BulkTableScanResultSet.reloadArray(Unknown 来源)在 org.apache.derby.impl.sql.execute.BulkTableScanResultSet.getNextRowCore(Unknown 来源)在 org.apache.derby.impl.sql.execute.BasicNoPutResultSetImpl.getNextRow(Unknown 来源)...还有 20 个
您有任何想法如何解决此问题吗?
或者任何可以在 2 个以上线程上运行的示例都非常有帮助。
提前谢谢。
在我看来,您可能遇到了并发问题,例如死锁或锁定超时。(这有点难以分辨,因为您的异常信息在问题中有点乱码,而且我认为,因为 Derby 消息是用母语字符串和英语字符串混合打印的)。
您可以在此处找到一些用于诊断和了解并发数据库访问遇到这些问题的原因的策略:https://wiki.apache.org/db-derby/LockDebugging
看起来像一个 Payara 问题,来自堆栈跟踪中的这一行:
fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData(JBatchJDBCPersistenceManager.java:503)
您可以尝试使用 GlassFish 运行您的应用程序,看看您是否遇到同样的问题。
或者,您可以将应用程序部署到 WildFly,其中包含 JBeret 作为批处理容器。如果您的应用程序是按照 JSR 352 规范编写的,那么它应该在任何符合 Java EE 7 的应用程序服务器中进行部署和运行。您可以将 WildFly 配置为将 jdbc 作业存储库与 Derby 或任何其他受支持的 DBMS(包括捆绑的 H2 数据库)一起使用。
如果您仍然陷入困境,我建议您跟进Payara项目。