我正试图在微服务中获取超过100万条记录。我犯了一个错误。我需要不断增加服务的应用程序内存,以便获取大量数据。有没有什么方法可以用来获取巨大的数据,而不是增加应用程序的内存。
2022-10-11T11:22:04.9898+08:00[APP/PROC/WEB/0][ERR]资源耗尽事件:JVM无法从堆中分配内存。2022-10-11T11:22:04.9898+08:00[APP/PROC/WEB/0][ERR]资源耗尽!(1/0(
@Value("${batch-size}")
private int batchSize;
public void archiveTableRecords(JdbcTemplate sourceDbTemplate, JdbcTemplate targetDbTemplate,
ArchiveConfigDTO archiveObj) {
try {
String sourceTable = archiveObj.getSourceTable();
String archive_months =archiveObj.getArchiveCriteriaMonths();
List<Object> primaryKeyValueList = new ArrayList<>();
String compareDate1 = getCSTDateNew(archive_months);
logger.info("Archive criteria date: {}", compareDate1);
List<Map<String, Object>> sourceRecords = sourceDbTemplate
.queryForList(ArchiveSQLQueries.buildSQLQueryToFetchSourceRecords(sourceTable), compareDate1);
int sourceRecordsSize = sourceRecords.size();
logger.info("Fetched {} {} record(s)", sourceRecords.size(), sourceTable);
if (sourceRecordsSize > 0) {
int recordsInserted = copySourceRecords(targetDbTemplate, archiveObj.getTargetTable(),
archiveObj.getPrimaryKeyColumn(), sourceRecords, primaryKeyValueList);
if (recordsInserted > 0)
deleteSourceRecords(sourceDbTemplate, sourceTable, archiveObj.getPrimaryKeyColumn(),
primaryKeyValueList);
}
} catch (Exception e) {
logger.error("Exception in archiveTableRecords: {} {}", e.getMessage(), e);
}
}
public static String buildSQLQueryToFetchSourceRecords(String sourceTable) {
StringBuilder sb = new StringBuilder("SELECT * FROM " + sourceTable + " where update_dts <= ?");
return sb.toString();
}
public int copySourceRecords(JdbcTemplate targetDbTemplate, String targetTable, String primaryKeyColumn,
List<Map<String, Object>> sourceRecords, List<Object> primaryKeyValueList) {
int result = 0;
logger.info("Copying records to {}", targetTable);
int[][] insertResult = targetDbTemplate.batchUpdate(
ArchiveSQLQueries.buildSQLTargetRecordInsertionQuery(targetTable, sourceRecords.get(0),
primaryKeyColumn),
sourceRecords, batchSize, new ParameterizedPreparedStatementSetter<Map<String, Object>>() {
@Override
public void setValues(PreparedStatement ps, Map<String, Object> argument) throws SQLException {
int index = 1;
for (Entry<String, Object> obj : argument.entrySet()) {
if (obj.getKey().equals(primaryKeyColumn))
primaryKeyValueList.add(obj.getValue());
else
ps.setObject(index++, obj.getValue());
}
}
});
result = getSumOfArray(insertResult);
logger.info("Inserted {} record(s) in {}", result, targetTable);
return result;
}
I have tried above code when fetching the data somehow i’m getting error .
根据方法名称判断,您可以从一个表复制到另一个表。使用LIMIT
和OFFSET
分批执行。简化示例:
SELECT *
FROM table
WHERE condition
ORDER BY column_which_ensures_last_added_data_is_fetched_last
LIMIT 5
OFFSET 0;
对于下一批,使用已处理的记录数更改偏移量。有关更多示例,请参阅文档。ORDER BY
列,这样您就可以先得到最旧的数据,最后得到最新的数据,以确保不会得到重复的数据。如果主键是自动递增的id,那么这是一个很好的选择。
如果您需要用于日志记录的计数,请使用COUNT
函数,或者使用批次计数计算已处理实体的数量。
SELECT COUNT(*)
FROM table
WHERE condition;
算法伪码:
int processed = 0;
List<Map<String, Object>> list = //fetch using above query, replace offset with processed
while(!list.isEmpty()) {
//copy contents of list to target
//handle exceptions, data not inserted in target, etc.
processed += list.size();
list = //fetch using above query, replace offset with processed
}