如何在批处理执行失败后重试未执行的语句



我需要用CSV中的数据更新一个表。所有数据都会在更新之前进行验证:验证方法(下面没有给出开关)会检查某些假设是否成立,并将对象"标记"为有效或无效。我已经测试了很多,它正按照我的意愿工作。

即便如此,我还是要保证,即使一个批次出现故障,所有语句都会被执行,这是我无法考虑的。如果发生这种情况,我希望跳过这个fail语句的批处理,并执行下一个语句。

public void updateTable(List<PersonBean> personList) {
    Connection connection = null;
    PreparedStatement ps = null;
    String updateDBPersonSQL = "UPDATE Person set merge_parent_id = ? WHERE id = ?";
    try {
        logger.info("DATA UPDATING STARTED");
        input = new FileInputStream("resources/propertiesFiles/applications.properties");
        properties.load(input);
        final int batchSize = Integer.parseInt(properties.getProperty("batchSize"));
        connection = DBConnection.getConnection();
        connection.setAutoCommit(false);
        int validObj = 0;
        ps = connection.prepareStatement(updateDBPersonSQL);
        for (int i = 0; i < personList.size(); i++) {
            PersonBean person = personList.get(i);
            if (person.getValidationStatus().equals("valid")) {
                ps.setInt(1, person.getMerge_parent_id());
                ps.setInt(2, person.getId());
                ps.addBatch();
                validObj++;
                if (validObj % batchSize == 0 && validObj != 0) {
                    ps.executeBatch();
                    connection.commit();
                    logger.info((batchSize) + " rows updated");
                } 
            }
        }
        int [] batchCount = ps.executeBatch();
        connection.commit();
        logger.info(batchCount.length + " rows updated");
        writeValidationStatusToCSV(personList);
    } catch (BatchUpdateException e) {
        int [] updateCount = e.getUpdateCounts();
            for (int i = 0; i < updateCount.length; i++) {
                if (updateCount[i] >= 0) {
                    logger.info(updateCount.length + " objects updated.");
                } else if (updateCount[i] == Statement.EXECUTE_FAILED) {
                    ?????
                }
            }
        logger.error(updateCount.length);
        logger.error("BatchUpdateException: " + e);
        logger.error("getNextException: " + e.getNextException());
        try {
            connection.rollback();
        } catch (SQLException e1) {
            logger.error("Rollback error: " + e1, e1);
        }
    } finally {
        if (ps!= null) {
            try {
                ps.close();
            } catch (SQLException e) {
                logger.info(e);
            }
        }
    }
    logger.info("DATA UPDATING FINISHED");
}

我看到了很多关于如何处理异常的材料,但没有一个解释或指导我如何重试下一个语句,也就是说,如何执行下一批。

我该如何做到这一点?

编辑:我正在使用Postgresql

我设法通过用trycatch语句包围批处理执行来重试下一批处理。通过这种方式,我可以捕获BatchUpdateException并调用continue语句。

try {
ps.executeBatch();
connection.commit();
/*Some more code*/
} catch (BatchUpdateException e) {
connection.rollback();
/*Some more code*/
continue;
}

我还使用了一些控制逻辑来"标记"已经执行的语句和批处理,并将其记录下来,这样在某些语句失败时更容易进行故障排除。

这是完整的代码:

public void updateTable(List<PersonBean> personList) throws Exception {
logger.info("TABLE UPDATE STARTED");
List <PersonBean> personListValidated = createValidStmtList(personList);
Connection connection = null;
PreparedStatement ps = null;
String updatePersonSQL = "UPDATE Person SET merge_parent_id = ? WHERE id = ?";
input = new FileInputStream("resources/propertiesFiles/applications.properties");
properties.load(input);
final int batchSize = Integer.parseInt(properties.getProperty("batchSize"));
/*A list was used to "flag" the batches that were already executed. BatchStatus objs have only two parameters, number (incremented as the batches are being executed) and status (success or fail).*/
List <BatchStatus> batchStatusList = new ArrayList<BatchStatus>();
 /*This variables will be used to help flag the batches and statements that were already executed.*/
int batchCount = 0;
int stmtAddedToBatchCount = 0;
try {
    connection = DBConnection.getConnection();
    connection.setAutoCommit(false);
    ps = connection.prepareStatement(updatePersonSQL);
    /*personListValidated contains the objects that will be updated in the table. Instead of doing the validation on the update method, I decomposed
     * this part in other 2 methods, making it easier to control of the statements added to the batch.
     */
    for (int i = 0; i < personListValidated.size(); i++) {
        PersonBean personValid = personListValidated.get(i);
        ps.setInt(1, personValid.getMerge_parent_id());
        ps.setInt(2, personValid.getId());
        ps.addBatch();
        personValid.setToBatch("true");
        stmtAddedToBatchCount++;
        logger.info("Row added to batch (count: " + stmtAddedToBatchCount + ")");
        if (stmtAddedToBatchCount % batchSize == 0) {
            batchCount++;
            try {
                ps.executeBatch();
                connection.commit();
                for (int j = stmtAddedToBatchCount - batchSize; j < stmtAddedToBatchCount; j++){
                    personValid = personListValidated.get(j);
                    personValid.setValidationStatus("success");
                }
                BatchStatus batchStatusObj = new BatchStatus(batchCount, "sucess");
                batchStatusList.add(batchStatusObj);   
                logger.info(batchStatusList.get(batchCount - 1));
            } catch (BatchUpdateException e) {
                connection.rollback();
                for (int j = stmtAddedToBatchCount - batchSize; j < stmtAddedToBatchCount; j++){
                    personValid = personListValidated.get(j);
                    personValid.setValidationStatus("fail");
                }
                BatchStatus batchStatusObj = new BatchStatus(batchCount, "fail");
                batchStatusList.add(batchStatusObj);
                logger.info(batchStatusList.get(batchCount - 1));
                logger.error("Bacth execution fail: " + e, e);
                continue;
            }
        }
    }
} catch (SQLException e) {
    logger.error(e, e);
}
int[] lastBatchCount = null;
/*Try and catch to handle the statements executed on the last batch*/
try {
    lastBatchCount = ps.executeBatch();
    connection.commit();
    for (int j = batchStatusList.size() * batchSize; j < stmtAddedToBatchCount; j++){
        PersonBean personValid = personListValidated.get(j);
        personValid.setValidationStatus("success");
    }
    logger.info(lastBatchCount.length + " rows inserted on the last batch");
    logger.info("Last batch excuted");
} catch (BatchUpdateException e) {
    connection.rollback();
    for (int j = batchStatusList.size() * batchSize; j < stmtAddedToBatchCount; j++){
        PersonBean personValid = personListValidated.get(j);
        personValid.setValidationStatus("fail");
    }
    logger.error("Last batch fail to execute: " + e, e);
}
writeValidationStatusToCSV(personList);
logger.info("TABLE UPDATE FINISHED");

}

最新更新