如何以编程方式为spring微线程(而不是块)配置容错



以编程方式配置块容错的工作原理如下:

stepBuilders.get("step")
  .<Partner,Partner>chunk(1)
  .reader(reader())
  .processor(processor())
  .writer(writer())
  .listener(logProcessListener())
  .faultTolerant()
  .skipLimit(10)
  .skip(UnknownGenderException.class)
  .listener(logSkipListener())
  .build();

诀窍在于,通过添加"chunk",链切换到提供"faulttolerance"方法的SimpleStepBuilder。

我的问题是如何做到这一点,如果你只有一个微线程(没有阅读器,处理器,写入器)?

定义微线程的工作方式如下:

stepBuilders.get("step")
  .tasklet(tasklet())
  .build();

"tasklet"的用法切换到一个不提供"容错"方法的TaskletStepBuilder。因此,我看不出如何定义像skipLimit这样的属性。

任何想法?

Tasklet没有要跳过的"项"的概念,因此容错仅对面向块的步骤有意义。我建议您在原始版本(1.1.0)中使用Spring Retry。现在发布了,只是,你有一些流畅的构建器选项和@Retryable注释)。

根据@DaveSyer给出的指导,使用org.springframework.retry:spring-retry:1.1.0。释放这就是我最后的结果:

Tasklet tasklet = // whatever
ProxyFactory proxyFactory = new ProxyFactory(Tasklet, new SingletonTargetSource(tasklet));
proxyFactory.addAdvice(RetryInterceptorBuilder.stateless()
                                              .maxAttempts(3)
                                              .build());
Step step = stepBuilderFactory.get("taskName")
                              .tasklet((Tasklet)proxyFactory.proxy)
                              .build();

我唯一正在努力的是,如果我想吞下导致maxAttempts超出后重试的异常。如果我将ExceptionHandler添加到步骤中,则永远不会重试该步骤。我猜这意味着异常处理程序是在切入点内的,我觉得有点奇怪。

//you need to have a bean in order to define the retry policies        
            @Configuration
            @EnableAutoConfiguration
            public class MyRetryBean{
              @Autowired
              private RetryProperties properties;
              private RetryTemplate retryTemplate;
              @Bean
              public RetryTemplate initializeRetryTemplate() {
                Map<Class<? extends Throwable>, Boolean> retryExceptions = Collections.singletonMap(IOException.class,
                      Boolean.TRUE);
                SimpleRetryPolicy policy = new SimpleRetryPolicy(properties.getMaxAttempt(), retryExceptions);
                retryTemplate = new RetryTemplate();
                retryTemplate.setRetryPolicy(policy);
                FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
                backOffPolicy.setBackOffPeriod(properties.getBackoffPeriod());
                retryTemplate.setBackOffPolicy(backOffPolicy);
                return retryTemplate;
              }
              public RetryTemplate getRetryTemplate() {
                return retryTemplate;
              }
            }
// in spring batch steps config
            @Autowire
            private MyRetryBean retryBean;
            stepBuilders.get("step")
              .tasklet(new MyTasklet(retryBean))
              .build();
// in your tasklet
        public class MyTasklet implements Tasklet{
          private MyRetryBean retry;
          public MyTasklet (MyRetryBean aRetry) {
            this.retry= aRetry;
          }
          @Override
          public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws IOException {
            retry.getRetryTemplate().execute(new RetryCallback<RepeatStatus, IOException>() {
              @Override
              public RepeatStatus doWithRetry(RetryContext context) throws IOException {
                throw new IOException();
              }
            });
            return null;
          }
        }

回答你的问题,没有没有支持:-(

我真的很喜欢对Tasklet也有Spring批处理重试支持。老实说,我不明白他们为什么要这么做。从Tasklet调用一个不稳定的系统是一个有效的用例。我不想使用@Retryable,因为Spring Batch对重试的支持更清晰。所以我想到了这个解决方案:

  @Bean
    public Step setInboxMessageStatusToReceivedStep(OneItemReader oneItemReader, SetInboxMessageStatusToReceivedItemWriter setInboxMessageStatusToReceivedItemWriter) {
        return this.stepBuilders.get("setInboxMessageStatusToReceivedStep")
                .<String, String>chunk(1)
                .reader(oneItemReader)
                .writer(setInboxMessageStatusToReceivedItemWriter)
                .faultTolerant()
                .backOffPolicy(milliSecondsBetweenRetiesPolicy(this.milliSecondsBetweenReties))
                .retryLimit(this.retryLimit)
                .retry(Throwable.class)
                .build();
    }
/**
 * Spring Batch has some cool things like retry or backOffPolicy. These things are only available for ItemReader + ItemWriter and not for Tasklets.
 * This is a nice Hack to use these Spring Batch features for Tasklets like thinks wrapped into a ItemReader + ItemWriter.
 */
public class OneItemReader implements ItemReader<String> {
    public static final String ONE_ITEM_READER_EXECUTION_CONTEXT_KEY = "ONE_ITEM_READER_EXECUTION_CONTEXT_KEY";
    public static final String JOB_SUCCESSFUL_FINISHED = "ONE_ITEM_READER_EXECUTION_CONTEXT_VALUE_JOB_SUCCESSFUL_FINISHED";
    private StepExecution stepExecution;
    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
    @Override
    public String read() {
        String isFirstTimeRead = (String) this.stepExecution.getExecutionContext().get(ONE_ITEM_READER_EXECUTION_CONTEXT_KEY);
        if (isFirstTimeRead == null) {
            return "dummy value just to ensure the writer is invoked";
        } else {
            // null will stop the reading
            return null;
        }
    }
    /**
     * As the last action in a Step, this method must be called. It tells the OneItemReader to stop returning Items.
     */
    public static void markDataAsRead(StepExecution stepExecution) {
        stepExecution.getExecutionContext().put(ONE_ITEM_READER_EXECUTION_CONTEXT_KEY, JOB_SUCCESSFUL_FINISHED);
    }
}

ItemWriter现在可以像Tasklet一样工作。它只被调用一次。

注意,你必须调用markDataAsRead(this.stepExecution);

最新更新