带有速率限制的春季批处理分区



背景

我使用SpringBatch通过HTTP API从我们的客户站点获取数据。进度包括两个主要步骤:

  1. 从API获取总文档,然后使用可配置的页面大小计算总页面。每个页面将使用自定义Paritioner分配给一个分区步骤
  2. 分区步骤将发送一个请求,以获取数据页(文档列表(、处理并写入我们的存储

客户站点可能是";脆弱的">。他们可能有费率限制,或者他们的网站可能在一些繁重的请求后没有响应。

到目前为止我做了什么

我正在使用spring-retry重新运行一个由于速率限制或服务器错误而失败的请求。例如:

// the partition step's item reader
@StepScope
public class CustomItemReader extends ItemReader<Object> {
private List<Object> items;
@Override
public Object read() {
if (Objects.isNull(items)) {
this.items = ImportService.getPage(pageId);
}
if (Objects.nonNull(items) && !items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
// config retry for fetching function
public class ImportService {
@Retryable(
value = RetryableException.class,
maxAttempts = 3,
backoff = @Backoff(
delay = 1000
)
)
public static List<Object> getPage(String pageId) throws RetryableException {
return ...;
}
}

重试配置包含Backoff策略,该策略具有增量延迟(1000毫秒(。我使用这个Retryable来处理重试和速率限制。

问题

  1. Retryable将重复等待并重新执行函数,该函数将始终保持线程。当事情变得更大时,实例可能会崩溃
  2. 因为每个客户都有自己的费率限制,所以将RetryableBackoff一起使用并不是控制费率的理想方式。尽管我为每个客户站点配置了core_pool_size,但core_pool_size=1对某些站点来说是不够的

问题

  1. 是否有适当的方法来限制Spring Batch的执行率,尤其是分区?例如:我想配置为在10秒内发送2个请求,而这将无法通过在步骤监听器中使用sleep来实现
  2. 我已经为一些爬网程序使用了scrapy,它具有非常酷的重试和速率限制功能。使用RetryMiddleware,它将对失败的页面进行排队,并在设置中具有RETRY_LIMIT。使用AutoThrottle,它可以根据服务器上的负载自动调节速度。有什么方法可以在SpringBatch中实现这些功能吗?或者我必须用scrapy重写我的项目

非常感谢

Spring Batch不提供此类功能。但在步骤中,您可以在适当的情况下使用任何速率限制库(即读取数据之前/之后、处理或写入数据之前/以后等(。

这应该会有所帮助:Spring批处理写入程序节流。

最新更新