发布消息时如何在Kafka Topic上配置Rate Limiting



我们使用Kafka来处理事件。我们有两个用例,它们会导致更多的事件。

  1. 有时我们会得到很多机器人流量
  2. 我们的一些客户正在频繁地产生更多的流量

这会导致处理其他客户事件的延迟。

所以我想增加一些基于客户的费率限制。根据我的研究,在向主题发布消息时,无法添加速率限制。

我们的应用程序是在Micronaut框架上开发的。

有没有办法在消费者层面增加限额?

在Kafka之上是否有任何与java相关的框架来实现速率限制?

使用kafka配额实施每位客户的限额将是一项挑战。相反,您可能想要查看resilience4j micronaut,并在生产商方面进行此限制。

您可以使用RateLimit注册表来创建和检索RateLimit实例:

RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMillis(1))
.limitForPeriod(10)
.timeoutDuration(Duration.ofMillis(25))
.build();
// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
// Use registry
RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry
.rateLimiter("name1");
RateLimiter rateLimiterWithCustomConfig = rateLimiterRegistry
.rateLimiter("name2", config);

您可以使用RateLimit装饰任何CallableSupplierRunnableConsumerCheckedRunnableCheckedSupplierCheckedConsumerCompletionStage

// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
Try.run(restrictedCall)
.andThenTry(restrictedCall)
.onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));

对于每个客户或每个用户的节流,您需要每个实体的RateLimit实例进行限制:

LimiterManager limiterManager = new LimiterManager();
String customerNameUnique = "Acme123"; // // Get from current client request
final RateLimiter rateLimiter = limiterManager.getLimiter(customerNameUnique);
Runnable runnable = RateLimiter.decorateRunnable(rateLimiter, new Runnable() {
@Override
public void run() {
// TODO: Your code here, publishing events to kafka topic
}
});
Try.runRunnable(runnable).onFailure(
error -> System.out.print(error)
);
// Use a LimiterManager utility class to create / retrieve per-customer instances of RateLimiter
public static class LimiterManager {

final ConcurrentMap<String, RateLimiter> keyRateLimiters = new ConcurrentHashMap<String, RateLimiter>();
final RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom().timeoutDuration(Duration.ofMillis(100))
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(25) // Max 25 accesses per 1 second
.build();
public RateLimiter getLimiter(String entity) {
return keyRateLimiters.compute(entity, (key, limiter) -> {
return (limiter == null) ? RateLimiter.of(entity, rateLimiterConfig) : limiter;
});
}
}

请求节流可以通过代理上的配额来控制,然后客户端可以在内部理解配额,而无需更改代码(除了添加身份验证以指定客户端组(。

您正在询问速率限制,但您试图实现的是优先处理数据,不是吗?这不是一件容易的事情,因为这真的取决于你的流量,但我建议考虑两件事:

  1. 瓶颈是什么?它是消费者吗?背压是从哪里来的
  2. 你是针对同一主题创作的吗?也许你想把流量分成多个话题
  3. 我开发过一个有点类似的产品,并使用了Flink,它提供了为每个源(或路由(配置parallelism的能力,它为主数据流提供QoS,在较低优先级的传入数据出现峰值的情况下,它会对其处理造成背压,但不会影响较高优先级的流

最新更新