Spring Batch Slow Write with多个数据源



我有一个spring批处理应用程序,从一个文件加载5M记录到SQL Server数据库。根据国家代码,我有不同的数据源。当我使用带有@Primary注释的单个数据源时,spring批处理写入器在5分钟内写入5M条记录。但是,当我使用@Bean注释给出多个数据源并使用非主要数据源将文件数据写入数据库时,性能变得非常慢,对于相同的5M记录需要大约15分钟。谁能解释一下Spring Batch在主数据源和其他数据源上的行为是否不同?

@Repository
@ComponentScan(basePackages = "com.abc.extract")
@EnableBatchProcessing
@EnableTransactionManagement
@Slf4j
public class DataSourceConfig {

public DataSourceConfig() {

}

@Autowired
private CCMConfiguration ccmConfig;

@Autowired
SpotlightConfig spotlight;

private DataSource datasourcesVal = null;

private DataSource datasourcesVal1 = null;

private Map<String,DataSource> datasourcesMap = null;

@Value("${app.country.code}")
String countryCode;

@Primary
@Bean("batchprimary")
public HikariDataSource hikariDataSource(@Value("${app.country.code}")
String countryCode1) throws IllegalBlockSizeException, NoSuchPaddingException, BadPaddingException,
NoSuchAlgorithmException, InvalidKeyException {
System.out.println("Contrycode:"+countryCode1);
SqlServerConfiguration dbConfiguration = ccmConfig.getSqlServerDatabaseConfig(countryCode1);
HikariDataSource ds = null;
try {
final String password = dbConfiguration.getDataBasePwd();
final String username = dbConfiguration.getDataBaseUserName();
ds = (HikariDataSource) DataSourceBuilder.create().type(HikariDataSource.class).build();
ds.setUsername(username);
ds.setPassword(password);
ds.setDriverClassName(dbConfiguration.getDataBaseDriver());
ds.setJdbcUrl(dbConfiguration.getDataBaseUrl());
ds.setConnectionTestQuery("SELECT 1");
ds.setConnectionTimeout(dbConfiguration.getConnectionTimeout());
ds.setIdleTimeout(dbConfiguration.getIdleTimeout());
ds.setMaximumPoolSize(dbConfiguration.getHikariPoolSize());
//ds.setMaximumPoolSize(2);
ds.setMaxLifetime(dbConfiguration.getMaxLifetime());
ds.setLeakDetectionThreshold(dbConfiguration.getLeakDetectionThreshold());
ds.setPoolName(dbConfiguration.getPoolName());
this.datasourcesVal = ds;
} catch (Exception ex) {
spotlight.sendNotification(ex, "Critical: Failed to establish Database connection");
log.error(ex.getMessage());
} finally {
if (!Objects.nonNull(ds) || ds.isClosed()) {
ds.close();
spotlight.sendNotification(new NullPointerException("Primary data source creation failed"),
"Critical: Failed to establish Database connection");
}
}
return ds;

}



@Bean("batchprimary1")
public HikariDataSource hikariDataSource1(@Value("${app.country.code}")
String countryCode1) throws IllegalBlockSizeException, NoSuchPaddingException, BadPaddingException,
NoSuchAlgorithmException, InvalidKeyException {
System.out.pri
ntln("Contrycode:"+countryCode1);
SqlServerConfiguration dbConfiguration = ccmConfig.getSqlServerDatabaseConfig(countryCode1);
HikariDataSource ds = null;
try {
final String password = dbConfiguration.getDataBasePwd();
final String username = dbConfiguration.getDataBaseUserName();
ds = (HikariDataSource) DataSourceBuilder.create().type(HikariDataSource.class).build();
ds.setUsername(username);
ds.setPassword(password);
ds.setDriverClassName(dbConfiguration.getDataBaseDriver());
ds.setJdbcUrl(dbConfiguration.getDataBaseUrl());
ds.setConnectionTestQuery("SELECT 1");
ds.setConnectionTimeout(dbConfiguration.getConnectionTimeout());
ds.setIdleTimeout(dbConfiguration.getIdleTimeout());
ds.setMaximumPoolSize(dbConfiguration.getHikariPoolSize());
//ds.setMaximumPoolSize(2);
ds.setMaxLifetime(dbConfiguration.getMaxLifetime());
ds.setLeakDetectionThreshold(dbConfiguration.getLeakDetectionThreshold());
ds.setPoolName(dbConfiguration.getPoolName());
//this.datasourcesVal = ds;
} catch (Exception ex) {
spotlight.sendNotification(ex, "Critical: Failed to establish Database connection");
log.error(ex.getMessage());
} finally {
if (!Objects.nonNull(ds) || ds.isClosed()) {
ds.close();
spotlight.sendNotification(new NullPointerException("Primary data source creation failed"),
"Critical: Failed to establish Database connection");
}
}
return ds;

}


@Autowired
@Qualifier("batchprimary")
public DataSource datasourcesVal;

@Autowired
@Qualifier("batchprimary1")
public DataSource datasourcesVal1;
@Bean
@JobScope
public Step ExtractNLoadItemOnHand(TaskExecutor taskExecutor,@Value("#{jobParameters}") Map<String, JobParameter> jobParameters)
throws InvalidKeyException, IllegalBlockSizeException,
NoSuchPaddingException, BadPaddingException,
NoSuchAlgorithmException, SQLException, ValidationException,
CalpiDataException {
int chunkSize = ApplicationConstants.CHUNK_SIZE;
log.info(" Extract and Load ItemOnHand Job - Started....");
countryCode = jobParameters.get("CountryCode").toString();
//dataSourceTemp = datasourcesMap.get(countryCode);
return stepBuilderFactory
.get("READ ITEM ONHAND STEP")
.listener(stepListners)
.<AccumOnhand, AccumOnhand> chunk(chunkSize)
.reader(extractItemOnHand(null))
.processor(new ItemProcessor<AccumOnhand, AccumOnhand>() {
@Override
public AccumOnhand process(AccumOnhand accumOnhand)
throws Exception {
return accumOnhand;
}
})
.writer(compositeItemWriter.compositeItemWriter(Arrays
.asList(loadItemOnHand()))).faultTolerant()
.retryLimit(ApplicationConstants.RETRY_SKIP_LIMIT)
.retry(Exception.class).skip(Exception.class)
.skipLimit(ApplicationConstants.RETRY_SKIP_LIMIT)
.taskExecutor(taskExecutor)
.throttleLimit(ApplicationConstants.THROTTLE_LIMIT).build();
}

@Bean
@StepScope
public JdbcBatchItemWriter<AccumOnhand> loadItemOnHand()
throws InvalidKeyException, IllegalBlockSizeException,
NoSuchPaddingException, BadPaddingException,
NoSuchAlgorithmException, ValidationException, CalpiDataException {
try {
//country = jobParameters.get("CountryCode");
JdbcBatchItemWriter<AccumOnhand> writer;
//datasource = dataSourceConfig.hikariDataSource(country.toString());
if(countryCode.equals("MX")) {
writer = stepDBItemWriter.writeDBData(
MessageFormat.format(SQLConstants.INS_ACCUM_ONHAND,
countryCode), datasourcesVal);
} else {
writer = stepDBItemWriter.writeDBData(
MessageFormat.format(SQLConstants.INS_ACCUM_ONHAND,
countryCode), datasourcesVal1);
}


log.info("Item on hand writer created successfully");
return writer;
} catch (Exception ex) {
log.error("Load loadItemOnHand Failed {} ", ex);
throw new ValidationException("Load loadItemOnHand Failed", ex);
}
}

根据我使用spring批处理的经验给出一些建议。

  • 确保每个数据库具有相同的写性能
  • 你试过使用commitInterval()吗?
  • 尽可能使用JdbcItemWriter或任何基于JDBC的策略编写
  • 在运行时检查cpu利用率。如果大于90%,则速度会降低
  • 如果捕获错误,Try catch将影响性能

相关内容

  • 没有找到相关文章

最新更新