以编程方式配置块容错的工作原理如下:
stepBuilders.get("step")
.<Partner,Partner>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(logProcessListener())
.faultTolerant()
.skipLimit(10)
.skip(UnknownGenderException.class)
.listener(logSkipListener())
.build();
诀窍在于,通过添加"chunk",链切换到提供"faulttolerance"方法的SimpleStepBuilder。
我的问题是如何做到这一点,如果你只有一个微线程(没有阅读器,处理器,写入器)?
定义微线程的工作方式如下:
stepBuilders.get("step")
.tasklet(tasklet())
.build();
"tasklet"的用法切换到一个不提供"容错"方法的TaskletStepBuilder。因此,我看不出如何定义像skipLimit这样的属性。
任何想法?
Tasklet
没有要跳过的"项"的概念,因此容错仅对面向块的步骤有意义。我建议您在原始版本(1.1.0)中使用Spring Retry。现在发布了,只是,你有一些流畅的构建器选项和@Retryable
注释)。
根据@DaveSyer给出的指导,使用org.springframework.retry:spring-retry:1.1.0。释放这就是我最后的结果:
Tasklet tasklet = // whatever
ProxyFactory proxyFactory = new ProxyFactory(Tasklet, new SingletonTargetSource(tasklet));
proxyFactory.addAdvice(RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.build());
Step step = stepBuilderFactory.get("taskName")
.tasklet((Tasklet)proxyFactory.proxy)
.build();
我唯一正在努力的是,如果我想吞下导致 回答你的问题,没有没有支持:-( 我真的很喜欢对Tasklet也有Spring批处理重试支持。老实说,我不明白他们为什么要这么做。从Tasklet调用一个不稳定的系统是一个有效的用例。我不想使用@Retryable,因为Spring Batch对重试的支持更清晰。所以我想到了这个解决方案: ItemWriter现在可以像Tasklet一样工作。它只被调用一次。 注意,你必须调用markDataAsRead(this.stepExecution);maxAttempts
超出后重试的异常。如果我将ExceptionHandler
添加到步骤中,则永远不会重试该步骤。我猜这意味着异常处理程序是在切入点内的,我觉得有点奇怪。//you need to have a bean in order to define the retry policies
@Configuration
@EnableAutoConfiguration
public class MyRetryBean{
@Autowired
private RetryProperties properties;
private RetryTemplate retryTemplate;
@Bean
public RetryTemplate initializeRetryTemplate() {
Map<Class<? extends Throwable>, Boolean> retryExceptions = Collections.singletonMap(IOException.class,
Boolean.TRUE);
SimpleRetryPolicy policy = new SimpleRetryPolicy(properties.getMaxAttempt(), retryExceptions);
retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(policy);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(properties.getBackoffPeriod());
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
public RetryTemplate getRetryTemplate() {
return retryTemplate;
}
}
// in spring batch steps config
@Autowire
private MyRetryBean retryBean;
stepBuilders.get("step")
.tasklet(new MyTasklet(retryBean))
.build();
// in your tasklet
public class MyTasklet implements Tasklet{
private MyRetryBean retry;
public MyTasklet (MyRetryBean aRetry) {
this.retry= aRetry;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws IOException {
retry.getRetryTemplate().execute(new RetryCallback<RepeatStatus, IOException>() {
@Override
public RepeatStatus doWithRetry(RetryContext context) throws IOException {
throw new IOException();
}
});
return null;
}
}
@Bean
public Step setInboxMessageStatusToReceivedStep(OneItemReader oneItemReader, SetInboxMessageStatusToReceivedItemWriter setInboxMessageStatusToReceivedItemWriter) {
return this.stepBuilders.get("setInboxMessageStatusToReceivedStep")
.<String, String>chunk(1)
.reader(oneItemReader)
.writer(setInboxMessageStatusToReceivedItemWriter)
.faultTolerant()
.backOffPolicy(milliSecondsBetweenRetiesPolicy(this.milliSecondsBetweenReties))
.retryLimit(this.retryLimit)
.retry(Throwable.class)
.build();
}
/**
* Spring Batch has some cool things like retry or backOffPolicy. These things are only available for ItemReader + ItemWriter and not for Tasklets.
* This is a nice Hack to use these Spring Batch features for Tasklets like thinks wrapped into a ItemReader + ItemWriter.
*/
public class OneItemReader implements ItemReader<String> {
public static final String ONE_ITEM_READER_EXECUTION_CONTEXT_KEY = "ONE_ITEM_READER_EXECUTION_CONTEXT_KEY";
public static final String JOB_SUCCESSFUL_FINISHED = "ONE_ITEM_READER_EXECUTION_CONTEXT_VALUE_JOB_SUCCESSFUL_FINISHED";
private StepExecution stepExecution;
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@Override
public String read() {
String isFirstTimeRead = (String) this.stepExecution.getExecutionContext().get(ONE_ITEM_READER_EXECUTION_CONTEXT_KEY);
if (isFirstTimeRead == null) {
return "dummy value just to ensure the writer is invoked";
} else {
// null will stop the reading
return null;
}
}
/**
* As the last action in a Step, this method must be called. It tells the OneItemReader to stop returning Items.
*/
public static void markDataAsRead(StepExecution stepExecution) {
stepExecution.getExecutionContext().put(ONE_ITEM_READER_EXECUTION_CONTEXT_KEY, JOB_SUCCESSFUL_FINISHED);
}
}