春季批处理:从 JMS 队列读取,步骤不会结束



我有一个简单的巴赫作业,它从JMS队列(ActiveMQ(中读取并写入文件。批处理作业按预期运行,并写入遵循已设置为 10,000 的提交间隔的文件。

在这方面有2点意见

  1. 批处理作业读取队列不会结束。

  2. 我看到队列中的所有消息都被消耗了,但只有当新消息被推送到 JMS 队列并且满足提交间隔时,最后一个块才会被写入文件。

这是预期的行为吗?我想安排批处理作业,并在该时间点使用和写入队列中存在的所有消息。有什么建议吗?

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public TransactionAwareConnectionFactoryProxy activeMQConnectionFactory() {
    ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
    TransactionAwareConnectionFactoryProxy activeMQConnectionFactory = new TransactionAwareConnectionFactoryProxy(amqConnectionFactory);
    return activeMQConnectionFactory;
}
@Bean
public ActiveMQQueue defaultQueue() {
    return new ActiveMQQueue("firstQueue");
}
@Bean
public PlatformTransactionManager transactionManager() {
    return new ResourcelessTransactionManager();
}
@Bean
public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception {
    return new MapJobRepositoryFactoryBean(transactionManager).getObject();
}
@Bean
@DependsOn("jobRepository")
public SimpleJobLauncher simpleJobLauncher(JobRepository jobRepository) {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    return simpleJobLauncher;
}   

如果我将接收超时设置为较小的数字,则不会消耗所有消息,从而设置为上限。

@Bean
@DependsOn(value = { "activeMQConnectionFactory", "defaultQueue" })
public JmsTemplate firstQueueTemplate(ActiveMQQueue defaultQueue, TransactionAwareConnectionFactoryProxy activeMQConnectionFactory) {
    JmsTemplate firstQueueTemplate = new JmsTemplate(activeMQConnectionFactory);
    firstQueueTemplate.setDefaultDestination(defaultQueue);
    firstQueueTemplate.setSessionTransacted(true);
    firstQueueTemplate.setReceiveTimeout(Long.MAX_VALUE);
    return firstQueueTemplate;
}

批处理作业的配置。

@Bean
public JmsItemReader<String> jmsItemReader(JmsTemplate firstQueueTemplate) {
    JmsItemReader<String> jmsItemReader = new JmsItemReader<>();
    jmsItemReader.setJmsTemplate(firstQueueTemplate);
    jmsItemReader.setItemType(String.class);
    return jmsItemReader;
}

@Bean
public ItemWriter<String> flatFileItemWriter() {
    FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource("/mypath/output.csv"));
    writer.setLineAggregator(new PassThroughLineAggregator<String>());
    return writer;
}
@Bean
@DependsOn(value = { "jmsItemReader", "jmsItemWriter", "jobRepository", "transactionManager" })
public Step queueReaderStep(JmsItemReader<String> jmsItemReader, ItemWriter<String> flatFileItemWriter, JobRepository jobRepository,
        PlatformTransactionManager transactionManager) throws Exception {
    StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
    AbstractTaskletStepBuilder<SimpleStepBuilder<String, String>> step = stepBuilderFactory.get("queueReaderStep").<String, String> chunk(10000)
            .reader(jmsItemReader).writer(flatFileItemWriter);
    return step.build();
}

@Bean
@DependsOn(value = { "jobRepository", "queueReaderStep" })
public Job jsmReaderJob(JobRepository jobRepository, Step queueReaderStep) {
    return this.jobBuilderFactory.get("jsmReaderJob").repository(jobRepository).incrementer(new RunIdIncrementer())
            .flow(queueReaderStep).end().build();
}
Spring

Batch 提供的JmsItemReader实际上更像是一个模板或示例,因为正如您所注意到的,它永远不会返回null因此该步骤永远不会结束。 您需要编写一些内容来指示给定的消息指示该步骤已完成。

最新更新